madsim_real_tokio/sync/
batch_semaphore.rs

1#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
2//! # Implementation Details.
3//!
4//! The semaphore is implemented using an intrusive linked list of waiters. An
5//! atomic counter tracks the number of available permits. If the semaphore does
6//! not contain the required number of permits, the task attempting to acquire
7//! permits places its waker at the end of a queue. When new permits are made
8//! available (such as by releasing an initial acquisition), they are assigned
9//! to the task at the front of the queue, waking that task if its requested
10//! number of permits is met.
11//!
12//! Because waiters are enqueued at the back of the linked list and dequeued
13//! from the front, the semaphore is fair. Tasks trying to acquire large numbers
14//! of permits at a time will always be woken eventually, even if many other
15//! tasks are acquiring smaller numbers of permits. This means that in a
16//! use-case like tokio's read-write lock, writers will not be starved by
17//! readers.
18use crate::loom::cell::UnsafeCell;
19use crate::loom::sync::atomic::AtomicUsize;
20use crate::loom::sync::{Mutex, MutexGuard};
21use crate::util::linked_list::{self, LinkedList};
22#[cfg(all(tokio_unstable, feature = "tracing"))]
23use crate::util::trace;
24use crate::util::WakeList;
25
26use std::future::Future;
27use std::marker::PhantomPinned;
28use std::pin::Pin;
29use std::ptr::NonNull;
30use std::sync::atomic::Ordering::*;
31use std::task::{Context, Poll, Waker};
32use std::{cmp, fmt};
33
34/// An asynchronous counting semaphore which permits waiting on multiple permits at once.
35pub(crate) struct Semaphore {
36    waiters: Mutex<Waitlist>,
37    /// The current number of available permits in the semaphore.
38    permits: AtomicUsize,
39    #[cfg(all(tokio_unstable, feature = "tracing"))]
40    resource_span: tracing::Span,
41}
42
43struct Waitlist {
44    queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
45    closed: bool,
46}
47
48/// Error returned from the [`Semaphore::try_acquire`] function.
49///
50/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
51#[derive(Debug, PartialEq, Eq)]
52pub enum TryAcquireError {
53    /// The semaphore has been [closed] and cannot issue new permits.
54    ///
55    /// [closed]: crate::sync::Semaphore::close
56    Closed,
57
58    /// The semaphore has no available permits.
59    NoPermits,
60}
61/// Error returned from the [`Semaphore::acquire`] function.
62///
63/// An `acquire` operation can only fail if the semaphore has been
64/// [closed].
65///
66/// [closed]: crate::sync::Semaphore::close
67/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
68#[derive(Debug)]
69pub struct AcquireError(());
70
71pub(crate) struct Acquire<'a> {
72    node: Waiter,
73    semaphore: &'a Semaphore,
74    num_permits: usize,
75    queued: bool,
76}
77
78/// An entry in the wait queue.
79struct Waiter {
80    /// The current state of the waiter.
81    ///
82    /// This is either the number of remaining permits required by
83    /// the waiter, or a flag indicating that the waiter is not yet queued.
84    state: AtomicUsize,
85
86    /// The waker to notify the task awaiting permits.
87    ///
88    /// # Safety
89    ///
90    /// This may only be accessed while the wait queue is locked.
91    waker: UnsafeCell<Option<Waker>>,
92
93    /// Intrusive linked-list pointers.
94    ///
95    /// # Safety
96    ///
97    /// This may only be accessed while the wait queue is locked.
98    ///
99    /// TODO: Ideally, we would be able to use loom to enforce that
100    /// this isn't accessed concurrently. However, it is difficult to
101    /// use a `UnsafeCell` here, since the `Link` trait requires _returning_
102    /// references to `Pointers`, and `UnsafeCell` requires that checked access
103    /// take place inside a closure. We should consider changing `Pointers` to
104    /// use `UnsafeCell` internally.
105    pointers: linked_list::Pointers<Waiter>,
106
107    #[cfg(all(tokio_unstable, feature = "tracing"))]
108    ctx: trace::AsyncOpTracingCtx,
109
110    /// Should not be `Unpin`.
111    _p: PhantomPinned,
112}
113
114generate_addr_of_methods! {
115    impl<> Waiter {
116        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
117            &self.pointers
118        }
119    }
120}
121
122impl Semaphore {
123    /// The maximum number of permits which a semaphore can hold.
124    ///
125    /// Note that this reserves three bits of flags in the permit counter, but
126    /// we only actually use one of them. However, the previous semaphore
127    /// implementation used three bits, so we will continue to reserve them to
128    /// avoid a breaking change if additional flags need to be added in the
129    /// future.
130    pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3;
131    const CLOSED: usize = 1;
132    // The least-significant bit in the number of permits is reserved to use
133    // as a flag indicating that the semaphore has been closed. Consequently
134    // PERMIT_SHIFT is used to leave that bit for that purpose.
135    const PERMIT_SHIFT: usize = 1;
136
137    /// Creates a new semaphore with the initial number of permits
138    ///
139    /// Maximum number of permits on 32-bit platforms is `1<<29`.
140    pub(crate) fn new(permits: usize) -> Self {
141        assert!(
142            permits <= Self::MAX_PERMITS,
143            "a semaphore may not have more than MAX_PERMITS permits ({})",
144            Self::MAX_PERMITS
145        );
146
147        #[cfg(all(tokio_unstable, feature = "tracing"))]
148        let resource_span = {
149            let resource_span = tracing::trace_span!(
150                "runtime.resource",
151                concrete_type = "Semaphore",
152                kind = "Sync",
153                is_internal = true
154            );
155
156            resource_span.in_scope(|| {
157                tracing::trace!(
158                    target: "runtime::resource::state_update",
159                    permits = permits,
160                    permits.op = "override",
161                )
162            });
163            resource_span
164        };
165
166        Self {
167            permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
168            waiters: Mutex::new(Waitlist {
169                queue: LinkedList::new(),
170                closed: false,
171            }),
172            #[cfg(all(tokio_unstable, feature = "tracing"))]
173            resource_span,
174        }
175    }
176
177    /// Creates a new semaphore with the initial number of permits.
178    ///
179    /// Maximum number of permits on 32-bit platforms is `1<<29`.
180    #[cfg(not(all(loom, test)))]
181    pub(crate) const fn const_new(permits: usize) -> Self {
182        assert!(permits <= Self::MAX_PERMITS);
183
184        Self {
185            permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
186            waiters: Mutex::const_new(Waitlist {
187                queue: LinkedList::new(),
188                closed: false,
189            }),
190            #[cfg(all(tokio_unstable, feature = "tracing"))]
191            resource_span: tracing::Span::none(),
192        }
193    }
194
195    /// Creates a new closed semaphore with 0 permits.
196    pub(crate) fn new_closed() -> Self {
197        Self {
198            permits: AtomicUsize::new(Self::CLOSED),
199            waiters: Mutex::new(Waitlist {
200                queue: LinkedList::new(),
201                closed: true,
202            }),
203            #[cfg(all(tokio_unstable, feature = "tracing"))]
204            resource_span: tracing::Span::none(),
205        }
206    }
207
208    /// Creates a new closed semaphore with 0 permits.
209    #[cfg(not(all(loom, test)))]
210    pub(crate) const fn const_new_closed() -> Self {
211        Self {
212            permits: AtomicUsize::new(Self::CLOSED),
213            waiters: Mutex::const_new(Waitlist {
214                queue: LinkedList::new(),
215                closed: true,
216            }),
217            #[cfg(all(tokio_unstable, feature = "tracing"))]
218            resource_span: tracing::Span::none(),
219        }
220    }
221
222    /// Returns the current number of available permits.
223    pub(crate) fn available_permits(&self) -> usize {
224        self.permits.load(Acquire) >> Self::PERMIT_SHIFT
225    }
226
227    /// Adds `added` new permits to the semaphore.
228    ///
229    /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
230    pub(crate) fn release(&self, added: usize) {
231        if added == 0 {
232            return;
233        }
234
235        // Assign permits to the wait queue
236        self.add_permits_locked(added, self.waiters.lock());
237    }
238
239    /// Closes the semaphore. This prevents the semaphore from issuing new
240    /// permits and notifies all pending waiters.
241    pub(crate) fn close(&self) {
242        let mut waiters = self.waiters.lock();
243        // If the semaphore's permits counter has enough permits for an
244        // unqueued waiter to acquire all the permits it needs immediately,
245        // it won't touch the wait list. Therefore, we have to set a bit on
246        // the permit counter as well. However, we must do this while
247        // holding the lock --- otherwise, if we set the bit and then wait
248        // to acquire the lock we'll enter an inconsistent state where the
249        // permit counter is closed, but the wait list is not.
250        self.permits.fetch_or(Self::CLOSED, Release);
251        waiters.closed = true;
252        while let Some(mut waiter) = waiters.queue.pop_back() {
253            let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
254            if let Some(waker) = waker {
255                waker.wake();
256            }
257        }
258    }
259
260    /// Returns true if the semaphore is closed.
261    pub(crate) fn is_closed(&self) -> bool {
262        self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
263    }
264
265    pub(crate) fn try_acquire(&self, num_permits: usize) -> Result<(), TryAcquireError> {
266        assert!(
267            num_permits <= Self::MAX_PERMITS,
268            "a semaphore may not have more than MAX_PERMITS permits ({})",
269            Self::MAX_PERMITS
270        );
271        let num_permits = num_permits << Self::PERMIT_SHIFT;
272        let mut curr = self.permits.load(Acquire);
273        loop {
274            // Has the semaphore closed?
275            if curr & Self::CLOSED == Self::CLOSED {
276                return Err(TryAcquireError::Closed);
277            }
278
279            // Are there enough permits remaining?
280            if curr < num_permits {
281                return Err(TryAcquireError::NoPermits);
282            }
283
284            let next = curr - num_permits;
285
286            match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
287                Ok(_) => {
288                    // TODO: Instrument once issue has been solved
289                    return Ok(());
290                }
291                Err(actual) => curr = actual,
292            }
293        }
294    }
295
296    pub(crate) fn acquire(&self, num_permits: usize) -> Acquire<'_> {
297        Acquire::new(self, num_permits)
298    }
299
300    /// Release `rem` permits to the semaphore's wait list, starting from the
301    /// end of the queue.
302    ///
303    /// If `rem` exceeds the number of permits needed by the wait list, the
304    /// remainder are assigned back to the semaphore.
305    fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
306        let mut wakers = WakeList::new();
307        let mut lock = Some(waiters);
308        let mut is_empty = false;
309        while rem > 0 {
310            let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
311            'inner: while wakers.can_push() {
312                // Was the waiter assigned enough permits to wake it?
313                match waiters.queue.last() {
314                    Some(waiter) => {
315                        if !waiter.assign_permits(&mut rem) {
316                            break 'inner;
317                        }
318                    }
319                    None => {
320                        is_empty = true;
321                        // If we assigned permits to all the waiters in the queue, and there are
322                        // still permits left over, assign them back to the semaphore.
323                        break 'inner;
324                    }
325                };
326                let mut waiter = waiters.queue.pop_back().unwrap();
327                if let Some(waker) =
328                    unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
329                {
330                    wakers.push(waker);
331                }
332            }
333
334            if rem > 0 && is_empty {
335                let permits = rem;
336                assert!(
337                    permits <= Self::MAX_PERMITS,
338                    "cannot add more than MAX_PERMITS permits ({})",
339                    Self::MAX_PERMITS
340                );
341                let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
342                let prev = prev >> Self::PERMIT_SHIFT;
343                assert!(
344                    prev + permits <= Self::MAX_PERMITS,
345                    "number of added permits ({}) would overflow MAX_PERMITS ({})",
346                    rem,
347                    Self::MAX_PERMITS
348                );
349
350                // add remaining permits back
351                #[cfg(all(tokio_unstable, feature = "tracing"))]
352                self.resource_span.in_scope(|| {
353                    tracing::trace!(
354                    target: "runtime::resource::state_update",
355                    permits = rem,
356                    permits.op = "add",
357                    )
358                });
359
360                rem = 0;
361            }
362
363            drop(waiters); // release the lock
364
365            wakers.wake_all();
366        }
367
368        assert_eq!(rem, 0);
369    }
370
371    /// Decrease a semaphore's permits by a maximum of `n`.
372    ///
373    /// If there are insufficient permits and it's not possible to reduce by `n`,
374    /// return the number of permits that were actually reduced.
375    pub(crate) fn forget_permits(&self, n: usize) -> usize {
376        if n == 0 {
377            return 0;
378        }
379
380        let mut curr_bits = self.permits.load(Acquire);
381        loop {
382            let curr = curr_bits >> Self::PERMIT_SHIFT;
383            let new = curr.saturating_sub(n);
384            match self.permits.compare_exchange_weak(
385                curr_bits,
386                new << Self::PERMIT_SHIFT,
387                AcqRel,
388                Acquire,
389            ) {
390                Ok(_) => return std::cmp::min(curr, n),
391                Err(actual) => curr_bits = actual,
392            };
393        }
394    }
395
396    fn poll_acquire(
397        &self,
398        cx: &mut Context<'_>,
399        num_permits: usize,
400        node: Pin<&mut Waiter>,
401        queued: bool,
402    ) -> Poll<Result<(), AcquireError>> {
403        let mut acquired = 0;
404
405        let needed = if queued {
406            node.state.load(Acquire) << Self::PERMIT_SHIFT
407        } else {
408            num_permits << Self::PERMIT_SHIFT
409        };
410
411        let mut lock = None;
412        // First, try to take the requested number of permits from the
413        // semaphore.
414        let mut curr = self.permits.load(Acquire);
415        let mut waiters = loop {
416            // Has the semaphore closed?
417            if curr & Self::CLOSED > 0 {
418                return Poll::Ready(Err(AcquireError::closed()));
419            }
420
421            let mut remaining = 0;
422            let total = curr
423                .checked_add(acquired)
424                .expect("number of permits must not overflow");
425            let (next, acq) = if total >= needed {
426                let next = curr - (needed - acquired);
427                (next, needed >> Self::PERMIT_SHIFT)
428            } else {
429                remaining = (needed - acquired) - curr;
430                (0, curr >> Self::PERMIT_SHIFT)
431            };
432
433            if remaining > 0 && lock.is_none() {
434                // No permits were immediately available, so this permit will
435                // (probably) need to wait. We'll need to acquire a lock on the
436                // wait queue before continuing. We need to do this _before_ the
437                // CAS that sets the new value of the semaphore's `permits`
438                // counter. Otherwise, if we subtract the permits and then
439                // acquire the lock, we might miss additional permits being
440                // added while waiting for the lock.
441                lock = Some(self.waiters.lock());
442            }
443
444            match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
445                Ok(_) => {
446                    acquired += acq;
447                    if remaining == 0 {
448                        if !queued {
449                            #[cfg(all(tokio_unstable, feature = "tracing"))]
450                            self.resource_span.in_scope(|| {
451                                tracing::trace!(
452                                    target: "runtime::resource::state_update",
453                                    permits = acquired,
454                                    permits.op = "sub",
455                                );
456                                tracing::trace!(
457                                    target: "runtime::resource::async_op::state_update",
458                                    permits_obtained = acquired,
459                                    permits.op = "add",
460                                )
461                            });
462
463                            return Poll::Ready(Ok(()));
464                        } else if lock.is_none() {
465                            break self.waiters.lock();
466                        }
467                    }
468                    break lock.expect("lock must be acquired before waiting");
469                }
470                Err(actual) => curr = actual,
471            }
472        };
473
474        if waiters.closed {
475            return Poll::Ready(Err(AcquireError::closed()));
476        }
477
478        #[cfg(all(tokio_unstable, feature = "tracing"))]
479        self.resource_span.in_scope(|| {
480            tracing::trace!(
481                target: "runtime::resource::state_update",
482                permits = acquired,
483                permits.op = "sub",
484            )
485        });
486
487        if node.assign_permits(&mut acquired) {
488            self.add_permits_locked(acquired, waiters);
489            return Poll::Ready(Ok(()));
490        }
491
492        assert_eq!(acquired, 0);
493        let mut old_waker = None;
494
495        // Otherwise, register the waker & enqueue the node.
496        node.waker.with_mut(|waker| {
497            // Safety: the wait list is locked, so we may modify the waker.
498            let waker = unsafe { &mut *waker };
499            // Do we need to register the new waker?
500            if waker
501                .as_ref()
502                .map_or(true, |waker| !waker.will_wake(cx.waker()))
503            {
504                old_waker = std::mem::replace(waker, Some(cx.waker().clone()));
505            }
506        });
507
508        // If the waiter is not already in the wait queue, enqueue it.
509        if !queued {
510            let node = unsafe {
511                let node = Pin::into_inner_unchecked(node) as *mut _;
512                NonNull::new_unchecked(node)
513            };
514
515            waiters.queue.push_front(node);
516        }
517        drop(waiters);
518        drop(old_waker);
519
520        Poll::Pending
521    }
522}
523
524impl fmt::Debug for Semaphore {
525    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
526        fmt.debug_struct("Semaphore")
527            .field("permits", &self.available_permits())
528            .finish()
529    }
530}
531
532impl Waiter {
533    fn new(
534        num_permits: usize,
535        #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx,
536    ) -> Self {
537        Waiter {
538            waker: UnsafeCell::new(None),
539            state: AtomicUsize::new(num_permits),
540            pointers: linked_list::Pointers::new(),
541            #[cfg(all(tokio_unstable, feature = "tracing"))]
542            ctx,
543            _p: PhantomPinned,
544        }
545    }
546
547    /// Assign permits to the waiter.
548    ///
549    /// Returns `true` if the waiter should be removed from the queue
550    fn assign_permits(&self, n: &mut usize) -> bool {
551        let mut curr = self.state.load(Acquire);
552        loop {
553            let assign = cmp::min(curr, *n);
554            let next = curr - assign;
555            match self.state.compare_exchange(curr, next, AcqRel, Acquire) {
556                Ok(_) => {
557                    *n -= assign;
558                    #[cfg(all(tokio_unstable, feature = "tracing"))]
559                    self.ctx.async_op_span.in_scope(|| {
560                        tracing::trace!(
561                            target: "runtime::resource::async_op::state_update",
562                            permits_obtained = assign,
563                            permits.op = "add",
564                        );
565                    });
566                    return next == 0;
567                }
568                Err(actual) => curr = actual,
569            }
570        }
571    }
572}
573
574impl Future for Acquire<'_> {
575    type Output = Result<(), AcquireError>;
576
577    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
578        #[cfg(all(tokio_unstable, feature = "tracing"))]
579        let _resource_span = self.node.ctx.resource_span.clone().entered();
580        #[cfg(all(tokio_unstable, feature = "tracing"))]
581        let _async_op_span = self.node.ctx.async_op_span.clone().entered();
582        #[cfg(all(tokio_unstable, feature = "tracing"))]
583        let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered();
584
585        let (node, semaphore, needed, queued) = self.project();
586
587        // First, ensure the current task has enough budget to proceed.
588        #[cfg(all(tokio_unstable, feature = "tracing"))]
589        let coop = ready!(trace_poll_op!(
590            "poll_acquire",
591            crate::runtime::coop::poll_proceed(cx),
592        ));
593
594        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
595        let coop = ready!(crate::runtime::coop::poll_proceed(cx));
596
597        let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
598            Poll::Pending => {
599                *queued = true;
600                Poll::Pending
601            }
602            Poll::Ready(r) => {
603                coop.made_progress();
604                r?;
605                *queued = false;
606                Poll::Ready(Ok(()))
607            }
608        };
609
610        #[cfg(all(tokio_unstable, feature = "tracing"))]
611        return trace_poll_op!("poll_acquire", result);
612
613        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
614        return result;
615    }
616}
617
618impl<'a> Acquire<'a> {
619    fn new(semaphore: &'a Semaphore, num_permits: usize) -> Self {
620        #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
621        return Self {
622            node: Waiter::new(num_permits),
623            semaphore,
624            num_permits,
625            queued: false,
626        };
627
628        #[cfg(all(tokio_unstable, feature = "tracing"))]
629        return semaphore.resource_span.in_scope(|| {
630            let async_op_span =
631                tracing::trace_span!("runtime.resource.async_op", source = "Acquire::new");
632            let async_op_poll_span = async_op_span.in_scope(|| {
633                tracing::trace!(
634                    target: "runtime::resource::async_op::state_update",
635                    permits_requested = num_permits,
636                    permits.op = "override",
637                );
638
639                tracing::trace!(
640                    target: "runtime::resource::async_op::state_update",
641                    permits_obtained = 0usize,
642                    permits.op = "override",
643                );
644
645                tracing::trace_span!("runtime.resource.async_op.poll")
646            });
647
648            let ctx = trace::AsyncOpTracingCtx {
649                async_op_span,
650                async_op_poll_span,
651                resource_span: semaphore.resource_span.clone(),
652            };
653
654            Self {
655                node: Waiter::new(num_permits, ctx),
656                semaphore,
657                num_permits,
658                queued: false,
659            }
660        });
661    }
662
663    fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, usize, &mut bool) {
664        fn is_unpin<T: Unpin>() {}
665        unsafe {
666            // Safety: all fields other than `node` are `Unpin`
667
668            is_unpin::<&Semaphore>();
669            is_unpin::<&mut bool>();
670            is_unpin::<usize>();
671
672            let this = self.get_unchecked_mut();
673            (
674                Pin::new_unchecked(&mut this.node),
675                this.semaphore,
676                this.num_permits,
677                &mut this.queued,
678            )
679        }
680    }
681}
682
683impl Drop for Acquire<'_> {
684    fn drop(&mut self) {
685        // If the future is completed, there is no node in the wait list, so we
686        // can skip acquiring the lock.
687        if !self.queued {
688            return;
689        }
690
691        // This is where we ensure safety. The future is being dropped,
692        // which means we must ensure that the waiter entry is no longer stored
693        // in the linked list.
694        let mut waiters = self.semaphore.waiters.lock();
695
696        // remove the entry from the list
697        let node = NonNull::from(&mut self.node);
698        // Safety: we have locked the wait list.
699        unsafe { waiters.queue.remove(node) };
700
701        let acquired_permits = self.num_permits - self.node.state.load(Acquire);
702        if acquired_permits > 0 {
703            self.semaphore.add_permits_locked(acquired_permits, waiters);
704        }
705    }
706}
707
708// Safety: the `Acquire` future is not `Sync` automatically because it contains
709// a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the
710// `UnsafeCell` is only accessed when the future is borrowed mutably (either in
711// `poll` or in `drop`). Therefore, it is safe (although not particularly
712// _useful_) for the future to be borrowed immutably across threads.
713unsafe impl Sync for Acquire<'_> {}
714
715// ===== impl AcquireError ====
716
717impl AcquireError {
718    fn closed() -> AcquireError {
719        AcquireError(())
720    }
721}
722
723impl fmt::Display for AcquireError {
724    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
725        write!(fmt, "semaphore closed")
726    }
727}
728
729impl std::error::Error for AcquireError {}
730
731// ===== impl TryAcquireError =====
732
733impl TryAcquireError {
734    /// Returns `true` if the error was caused by a closed semaphore.
735    #[allow(dead_code)] // may be used later!
736    pub(crate) fn is_closed(&self) -> bool {
737        matches!(self, TryAcquireError::Closed)
738    }
739
740    /// Returns `true` if the error was caused by calling `try_acquire` on a
741    /// semaphore with no available permits.
742    #[allow(dead_code)] // may be used later!
743    pub(crate) fn is_no_permits(&self) -> bool {
744        matches!(self, TryAcquireError::NoPermits)
745    }
746}
747
748impl fmt::Display for TryAcquireError {
749    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
750        match self {
751            TryAcquireError::Closed => write!(fmt, "semaphore closed"),
752            TryAcquireError::NoPermits => write!(fmt, "no permits available"),
753        }
754    }
755}
756
757impl std::error::Error for TryAcquireError {}
758
759/// # Safety
760///
761/// `Waiter` is forced to be !Unpin.
762unsafe impl linked_list::Link for Waiter {
763    type Handle = NonNull<Waiter>;
764    type Target = Waiter;
765
766    fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
767        *handle
768    }
769
770    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
771        ptr
772    }
773
774    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
775        Waiter::addr_of_pointers(target)
776    }
777}