tokio_sync/
semaphore.rs

1//! Thread-safe, asynchronous counting semaphore.
2//!
3//! A `Semaphore` instance holds a set of permits. Permits are used to
4//! synchronize access to a shared resource.
5//!
6//! Before accessing the shared resource, callers acquire a permit from the
7//! semaphore. Once the permit is acquired, the caller then enters the critical
8//! section. If no permits are available, then acquiring the semaphore returns
9//! `NotReady`. The task is notified once a permit becomes available.
10
11use loom::{
12    futures::AtomicTask,
13    sync::{
14        atomic::{AtomicPtr, AtomicUsize},
15        CausalCell,
16    },
17    yield_now,
18};
19
20use futures::Poll;
21
22use std::fmt;
23use std::ptr::{self, NonNull};
24use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
25use std::sync::Arc;
26use std::usize;
27
28/// Futures-aware semaphore.
29pub struct Semaphore {
30    /// Tracks both the waiter queue tail pointer and the number of remaining
31    /// permits.
32    state: AtomicUsize,
33
34    /// waiter queue head pointer.
35    head: CausalCell<NonNull<WaiterNode>>,
36
37    /// Coordinates access to the queue head.
38    rx_lock: AtomicUsize,
39
40    /// Stub waiter node used as part of the MPSC channel algorithm.
41    stub: Box<WaiterNode>,
42}
43
44/// A semaphore permit
45///
46/// Tracks the lifecycle of a semaphore permit.
47///
48/// An instance of `Permit` is intended to be used with a **single** instance of
49/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore
50/// instances will result in unexpected behavior.
51///
52/// `Permit` does **not** release the permit back to the semaphore on drop. It
53/// is the user's responsibility to ensure that `Permit::release` is called
54/// before dropping the permit.
55#[derive(Debug)]
56pub struct Permit {
57    waiter: Option<Arc<WaiterNode>>,
58    state: PermitState,
59}
60
61/// Error returned by `Permit::poll_acquire`.
62#[derive(Debug)]
63pub struct AcquireError(());
64
65/// Error returned by `Permit::try_acquire`.
66#[derive(Debug)]
67pub struct TryAcquireError {
68    kind: ErrorKind,
69}
70
71#[derive(Debug)]
72enum ErrorKind {
73    Closed,
74    NoPermits,
75}
76
77/// Node used to notify the semaphore waiter when permit is available.
78#[derive(Debug)]
79struct WaiterNode {
80    /// Stores waiter state.
81    ///
82    /// See `NodeState` for more details.
83    state: AtomicUsize,
84
85    /// Task to notify when a permit is made available.
86    task: AtomicTask,
87
88    /// Next pointer in the queue of waiting senders.
89    next: AtomicPtr<WaiterNode>,
90}
91
92/// Semaphore state
93///
94/// The 2 low bits track the modes.
95///
96/// - Closed
97/// - Full
98///
99/// When not full, the rest of the `usize` tracks the total number of messages
100/// in the channel. When full, the rest of the `usize` is a pointer to the tail
101/// of the "waiting senders" queue.
102#[derive(Copy, Clone)]
103struct SemState(usize);
104
105/// Permit state
106#[derive(Debug, Copy, Clone, Eq, PartialEq)]
107enum PermitState {
108    /// The permit has not been requested.
109    Idle,
110
111    /// Currently waiting for a permit to be made available and assigned to the
112    /// waiter.
113    Waiting,
114
115    /// The permit has been acquired.
116    Acquired,
117}
118
119/// Waiter node state
120#[derive(Debug, Copy, Clone, Eq, PartialEq)]
121#[repr(usize)]
122enum NodeState {
123    /// Not waiting for a permit and the node is not in the wait queue.
124    ///
125    /// This is the initial state.
126    Idle = 0,
127
128    /// Not waiting for a permit but the node is in the wait queue.
129    ///
130    /// This happens when the waiter has previously requested a permit, but has
131    /// since canceled the request. The node cannot be removed by the waiter, so
132    /// this state informs the receiver to skip the node when it pops it from
133    /// the wait queue.
134    Queued = 1,
135
136    /// Waiting for a permit and the node is in the wait queue.
137    QueuedWaiting = 2,
138
139    /// The waiter has been assigned a permit and the node has been removed from
140    /// the queue.
141    Assigned = 3,
142
143    /// The semaphore has been closed. No more permits will be issued.
144    Closed = 4,
145}
146
147// ===== impl Semaphore =====
148
149impl Semaphore {
150    /// Creates a new semaphore with the initial number of permits
151    ///
152    /// # Panics
153    ///
154    /// Panics if `permits` is zero.
155    pub fn new(permits: usize) -> Semaphore {
156        let stub = Box::new(WaiterNode::new());
157        let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap();
158
159        // Allocations are aligned
160        debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0);
161
162        let state = SemState::new(permits, &stub);
163
164        Semaphore {
165            state: AtomicUsize::new(state.to_usize()),
166            head: CausalCell::new(ptr),
167            rx_lock: AtomicUsize::new(0),
168            stub,
169        }
170    }
171
172    /// Returns the current number of available permits
173    pub fn available_permits(&self) -> usize {
174        let curr = SemState::load(&self.state, Acquire);
175        curr.available_permits()
176    }
177
178    /// Poll for a permit
179    fn poll_permit(&self, mut permit: Option<&mut Permit>) -> Poll<(), AcquireError> {
180        use futures::Async::*;
181
182        // Load the current state
183        let mut curr = SemState::load(&self.state, Acquire);
184
185        debug!(" + poll_permit; sem-state = {:?}", curr);
186
187        // Tracks a *mut WaiterNode representing an Arc clone.
188        //
189        // This avoids having to bump the ref count unless required.
190        let mut maybe_strong: Option<NonNull<WaiterNode>> = None;
191
192        macro_rules! undo_strong {
193            () => {
194                if let Some(waiter) = maybe_strong {
195                    // The waiter was cloned, but never got queued.
196                    // Before entering `poll_permit`, the waiter was in the
197                    // `Idle` state. We must transition the node back to the
198                    // idle state.
199                    let waiter = unsafe { Arc::from_raw(waiter.as_ptr()) };
200                    waiter.revert_to_idle();
201                }
202            };
203        }
204
205        loop {
206            let mut next = curr;
207
208            if curr.is_closed() {
209                undo_strong!();
210                return Err(AcquireError::closed());
211            }
212
213            if !next.acquire_permit(&self.stub) {
214                debug!(" + poll_permit -- no permits");
215
216                debug_assert!(curr.waiter().is_some());
217
218                if maybe_strong.is_none() {
219                    if let Some(ref mut permit) = permit {
220                        // Get the Sender's waiter node, or initialize one
221                        let waiter = permit
222                            .waiter
223                            .get_or_insert_with(|| Arc::new(WaiterNode::new()));
224
225                        waiter.register();
226
227                        debug!(" + poll_permit -- to_queued_waiting");
228
229                        if !waiter.to_queued_waiting() {
230                            debug!(" + poll_permit; waiter already queued");
231                            // The node is alrady queued, there is no further work
232                            // to do.
233                            return Ok(NotReady);
234                        }
235
236                        maybe_strong = Some(WaiterNode::into_non_null(waiter.clone()));
237                    } else {
238                        // If no `waiter`, then the task is not registered and there
239                        // is no further work to do.
240                        return Ok(NotReady);
241                    }
242                }
243
244                next.set_waiter(maybe_strong.unwrap());
245            }
246
247            debug!(" + poll_permit -- pre-CAS; next = {:?}", next);
248
249            debug_assert_ne!(curr.0, 0);
250            debug_assert_ne!(next.0, 0);
251
252            match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
253                Ok(_) => {
254                    debug!(" + poll_permit -- CAS ok");
255                    match curr.waiter() {
256                        Some(prev_waiter) => {
257                            let waiter = maybe_strong.unwrap();
258
259                            // Finish pushing
260                            unsafe {
261                                prev_waiter.as_ref().next.store(waiter.as_ptr(), Release);
262                            }
263
264                            debug!(" + poll_permit -- waiter pushed");
265
266                            return Ok(NotReady);
267                        }
268                        None => {
269                            debug!(" + poll_permit -- permit acquired");
270
271                            undo_strong!();
272
273                            return Ok(Ready(()));
274                        }
275                    }
276                }
277                Err(actual) => {
278                    curr = actual;
279                }
280            }
281        }
282    }
283
284    /// Close the semaphore. This prevents the semaphore from issuing new
285    /// permits and notifies all pending waiters.
286    pub fn close(&self) {
287        debug!("+ Semaphore::close");
288
289        // Acquire the `rx_lock`, setting the "closed" flag on the lock.
290        let prev = self.rx_lock.fetch_or(1, AcqRel);
291        debug!(" + close -- rx_lock.fetch_add(1)");
292
293        if prev != 0 {
294            debug!("+ close -- locked; prev = {}", prev);
295            // Another thread has the lock and will be responsible for notifying
296            // pending waiters.
297            return;
298        }
299
300        self.add_permits_locked(0, true);
301    }
302
303    /// Add `n` new permits to the semaphore.
304    pub fn add_permits(&self, n: usize) {
305        debug!(" + add_permits; n = {}", n);
306
307        if n == 0 {
308            return;
309        }
310
311        // TODO: Handle overflow. A panic is not sufficient, the process must
312        // abort.
313        let prev = self.rx_lock.fetch_add(n << 1, AcqRel);
314        debug!(" + add_permits; rx_lock.fetch_add(n << 1); n = {}", n);
315
316        if prev != 0 {
317            debug!(" + add_permits -- locked; prev = {}", prev);
318            // Another thread has the lock and will be responsible for notifying
319            // pending waiters.
320            return;
321        }
322
323        self.add_permits_locked(n, false);
324    }
325
326    fn add_permits_locked(&self, mut rem: usize, mut closed: bool) {
327        while rem > 0 || closed {
328            debug!(
329                " + add_permits_locked -- iter; rem = {}; closed = {:?}",
330                rem, closed
331            );
332
333            if closed {
334                SemState::fetch_set_closed(&self.state, AcqRel);
335            }
336
337            // Release the permits and notify
338            self.add_permits_locked2(rem, closed);
339
340            let n = rem << 1;
341
342            let actual = if closed {
343                let actual = self.rx_lock.fetch_sub(n | 1, AcqRel);
344                debug!(
345                    " + add_permits_locked; rx_lock.fetch_sub(n | 1); n = {}; actual={}",
346                    n, actual
347                );
348
349                closed = false;
350                actual
351            } else {
352                let actual = self.rx_lock.fetch_sub(n, AcqRel);
353                debug!(
354                    " + add_permits_locked; rx_lock.fetch_sub(n); n = {}; actual={}",
355                    n, actual
356                );
357
358                closed = actual & 1 == 1;
359                actual
360            };
361
362            rem = (actual >> 1) - rem;
363        }
364
365        debug!(" + add_permits; done");
366    }
367
368    /// Release a specific amount of permits to the semaphore
369    ///
370    /// This function is called by `add_permits` after the add lock has been
371    /// acquired.
372    fn add_permits_locked2(&self, mut n: usize, closed: bool) {
373        while n > 0 || closed {
374            let waiter = match self.pop(n, closed) {
375                Some(waiter) => waiter,
376                None => {
377                    return;
378                }
379            };
380
381            debug!(" + release_n -- notify");
382
383            if waiter.notify(closed) {
384                n = n.saturating_sub(1);
385                debug!(" + release_n -- dec");
386            }
387        }
388    }
389
390    /// Pop a waiter
391    ///
392    /// `rem` represents the remaining number of times the caller will pop. If
393    /// there are no more waiters to pop, `rem` is used to set the available
394    /// permits.
395    fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> {
396        debug!(" + pop; rem = {}", rem);
397
398        'outer: loop {
399            unsafe {
400                let mut head = self.head.with(|head| *head);
401                let mut next_ptr = head.as_ref().next.load(Acquire);
402
403                let stub = self.stub();
404
405                if head == stub {
406                    debug!(" + pop; head == stub");
407
408                    let next = match NonNull::new(next_ptr) {
409                        Some(next) => next,
410                        None => {
411                            // This loop is not part of the standard intrusive mpsc
412                            // channel algorithm. This is where we atomically pop
413                            // the last task and add `rem` to the remaining capacity.
414                            //
415                            // This modification to the pop algorithm works because,
416                            // at this point, we have not done any work (only done
417                            // reading). We have a *pretty* good idea that there is
418                            // no concurrent pusher.
419                            //
420                            // The capacity is then atomically added by doing an
421                            // AcqRel CAS on `state`. The `state` cell is the
422                            // linchpin of the algorithm.
423                            //
424                            // By successfully CASing `head` w/ AcqRel, we ensure
425                            // that, if any thread was racing and entered a push, we
426                            // see that and abort pop, retrying as it is
427                            // "inconsistent".
428                            let mut curr = SemState::load(&self.state, Acquire);
429
430                            loop {
431                                if curr.has_waiter(&self.stub) {
432                                    // Inconsistent
433                                    debug!(" + pop; inconsistent 1");
434                                    yield_now();
435                                    continue 'outer;
436                                }
437
438                                // When closing the semaphore, nodes are popped
439                                // with `rem == 0`. In this case, we are not
440                                // adding permits, but notifying waiters of the
441                                // semaphore's closed state.
442                                if rem == 0 {
443                                    debug_assert!(curr.is_closed(), "state = {:?}", curr);
444                                    return None;
445                                }
446
447                                let mut next = curr;
448                                next.release_permits(rem, &self.stub);
449
450                                match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
451                                    Ok(_) => return None,
452                                    Err(actual) => {
453                                        curr = actual;
454                                    }
455                                }
456                            }
457                        }
458                    };
459
460                    debug!(" + pop; got next waiter");
461
462                    self.head.with_mut(|head| *head = next);
463                    head = next;
464                    next_ptr = next.as_ref().next.load(Acquire);
465                }
466
467                if let Some(next) = NonNull::new(next_ptr) {
468                    self.head.with_mut(|head| *head = next);
469
470                    return Some(Arc::from_raw(head.as_ptr()));
471                }
472
473                let state = SemState::load(&self.state, Acquire);
474
475                // This must always be a pointer as the wait list is not empty.
476                let tail = state.waiter().unwrap();
477
478                if tail != head {
479                    // Inconsistent
480                    debug!(" + pop; inconsistent 2");
481                    yield_now();
482                    continue 'outer;
483                }
484
485                self.push_stub(closed);
486
487                next_ptr = head.as_ref().next.load(Acquire);
488
489                if let Some(next) = NonNull::new(next_ptr) {
490                    self.head.with_mut(|head| *head = next);
491
492                    return Some(Arc::from_raw(head.as_ptr()));
493                }
494
495                // Inconsistent state, loop
496                debug!(" + pop; inconsistent 3");
497                yield_now();
498            }
499        }
500    }
501
502    unsafe fn push_stub(&self, closed: bool) {
503        let stub = self.stub();
504
505        // Set the next pointer. This does not require an atomic operation as
506        // this node is not accessible. The write will be flushed with the next
507        // operation
508        stub.as_ref().next.store(ptr::null_mut(), Relaxed);
509
510        // Update the tail to point to the new node. We need to see the previous
511        // node in order to update the next pointer as well as release `task`
512        // to any other threads calling `push`.
513        let prev = SemState::new_ptr(stub, closed).swap(&self.state, AcqRel);
514
515        debug_assert_eq!(closed, prev.is_closed());
516
517        // The stub is only pushed when there are pending tasks. Because of
518        // this, the state must *always* be in pointer mode.
519        let prev = prev.waiter().unwrap();
520
521        // We don't want the *existing* pointer to be a stub.
522        debug_assert_ne!(prev, stub);
523
524        // Release `task` to the consume end.
525        prev.as_ref().next.store(stub.as_ptr(), Release);
526    }
527
528    fn stub(&self) -> NonNull<WaiterNode> {
529        unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) }
530    }
531}
532
533impl fmt::Debug for Semaphore {
534    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
535        fmt.debug_struct("Semaphore")
536            .field("state", &SemState::load(&self.state, Relaxed))
537            .field("head", &self.head.with(|ptr| ptr))
538            .field("rx_lock", &self.rx_lock.load(Relaxed))
539            .field("stub", &self.stub)
540            .finish()
541    }
542}
543
544unsafe impl Send for Semaphore {}
545unsafe impl Sync for Semaphore {}
546
547// ===== impl Permit =====
548
549impl Permit {
550    /// Create a new `Permit`.
551    ///
552    /// The permit begins in the "unacquired" state.
553    ///
554    /// # Examples
555    ///
556    /// ```
557    /// use tokio_sync::semaphore::Permit;
558    ///
559    /// let permit = Permit::new();
560    /// assert!(!permit.is_acquired());
561    /// ```
562    pub fn new() -> Permit {
563        Permit {
564            waiter: None,
565            state: PermitState::Idle,
566        }
567    }
568
569    /// Returns true if the permit has been acquired
570    pub fn is_acquired(&self) -> bool {
571        self.state == PermitState::Acquired
572    }
573
574    /// Try to acquire the permit. If no permits are available, the current task
575    /// is notified once a new permit becomes available.
576    pub fn poll_acquire(&mut self, semaphore: &Semaphore) -> Poll<(), AcquireError> {
577        use futures::Async::*;
578
579        match self.state {
580            PermitState::Idle => {}
581            PermitState::Waiting => {
582                let waiter = self.waiter.as_ref().unwrap();
583
584                if waiter.acquire()? {
585                    self.state = PermitState::Acquired;
586                    return Ok(Ready(()));
587                } else {
588                    return Ok(NotReady);
589                }
590            }
591            PermitState::Acquired => {
592                return Ok(Ready(()));
593            }
594        }
595
596        match semaphore.poll_permit(Some(self))? {
597            Ready(v) => {
598                self.state = PermitState::Acquired;
599                Ok(Ready(v))
600            }
601            NotReady => {
602                self.state = PermitState::Waiting;
603                Ok(NotReady)
604            }
605        }
606    }
607
608    /// Try to acquire the permit.
609    pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
610        use futures::Async::*;
611
612        match self.state {
613            PermitState::Idle => {}
614            PermitState::Waiting => {
615                let waiter = self.waiter.as_ref().unwrap();
616
617                if waiter.acquire2().map_err(to_try_acquire)? {
618                    self.state = PermitState::Acquired;
619                    return Ok(());
620                } else {
621                    return Err(TryAcquireError::no_permits());
622                }
623            }
624            PermitState::Acquired => {
625                return Ok(());
626            }
627        }
628
629        match semaphore.poll_permit(None).map_err(to_try_acquire)? {
630            Ready(()) => {
631                self.state = PermitState::Acquired;
632                Ok(())
633            }
634            NotReady => Err(TryAcquireError::no_permits()),
635        }
636    }
637
638    /// Release a permit back to the semaphore
639    pub fn release(&mut self, semaphore: &Semaphore) {
640        if self.forget2() {
641            semaphore.add_permits(1);
642        }
643    }
644
645    /// Forget the permit **without** releasing it back to the semaphore.
646    ///
647    /// After calling `forget`, `poll_acquire` is able to acquire new permit
648    /// from the sempahore.
649    ///
650    /// Repeatedly calling `forget` without associated calls to `add_permit`
651    /// will result in the semaphore losing all permits.
652    pub fn forget(&mut self) {
653        self.forget2();
654    }
655
656    /// Returns `true` if the permit was acquired
657    fn forget2(&mut self) -> bool {
658        match self.state {
659            PermitState::Idle => false,
660            PermitState::Waiting => {
661                let ret = self.waiter.as_ref().unwrap().cancel_interest();
662                self.state = PermitState::Idle;
663                ret
664            }
665            PermitState::Acquired => {
666                self.state = PermitState::Idle;
667                true
668            }
669        }
670    }
671}
672
673// ===== impl AcquireError ====
674
675impl AcquireError {
676    fn closed() -> AcquireError {
677        AcquireError(())
678    }
679}
680
681fn to_try_acquire(_: AcquireError) -> TryAcquireError {
682    TryAcquireError::closed()
683}
684
685impl fmt::Display for AcquireError {
686    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
687        use std::error::Error;
688        write!(fmt, "{}", self.description())
689    }
690}
691
692impl ::std::error::Error for AcquireError {
693    fn description(&self) -> &str {
694        "semaphore closed"
695    }
696}
697
698// ===== impl TryAcquireError =====
699
700impl TryAcquireError {
701    fn closed() -> TryAcquireError {
702        TryAcquireError {
703            kind: ErrorKind::Closed,
704        }
705    }
706
707    fn no_permits() -> TryAcquireError {
708        TryAcquireError {
709            kind: ErrorKind::NoPermits,
710        }
711    }
712
713    /// Returns true if the error was caused by a closed semaphore.
714    pub fn is_closed(&self) -> bool {
715        match self.kind {
716            ErrorKind::Closed => true,
717            _ => false,
718        }
719    }
720
721    /// Returns true if the error was caused by calling `try_acquire` on a
722    /// semaphore with no available permits.
723    pub fn is_no_permits(&self) -> bool {
724        match self.kind {
725            ErrorKind::NoPermits => true,
726            _ => false,
727        }
728    }
729}
730
731impl fmt::Display for TryAcquireError {
732    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
733        use std::error::Error;
734        write!(fmt, "{}", self.description())
735    }
736}
737
738impl ::std::error::Error for TryAcquireError {
739    fn description(&self) -> &str {
740        match self.kind {
741            ErrorKind::Closed => "semaphore closed",
742            ErrorKind::NoPermits => "no permits available",
743        }
744    }
745}
746
747// ===== impl WaiterNode =====
748
749impl WaiterNode {
750    fn new() -> WaiterNode {
751        WaiterNode {
752            state: AtomicUsize::new(NodeState::new().to_usize()),
753            task: AtomicTask::new(),
754            next: AtomicPtr::new(ptr::null_mut()),
755        }
756    }
757
758    fn acquire(&self) -> Result<bool, AcquireError> {
759        if self.acquire2()? {
760            return Ok(true);
761        }
762
763        self.task.register();
764
765        self.acquire2()
766    }
767
768    fn acquire2(&self) -> Result<bool, AcquireError> {
769        use self::NodeState::*;
770
771        match Idle.compare_exchange(&self.state, Assigned, AcqRel, Acquire) {
772            Ok(_) => Ok(true),
773            Err(Closed) => Err(AcquireError::closed()),
774            Err(_) => Ok(false),
775        }
776    }
777
778    fn register(&self) {
779        self.task.register()
780    }
781
782    /// Returns `true` if the permit has been acquired
783    fn cancel_interest(&self) -> bool {
784        use self::NodeState::*;
785
786        match Queued.compare_exchange(&self.state, QueuedWaiting, AcqRel, Acquire) {
787            // Successfully removed interest from the queued node. The permit
788            // has not been assigned to the node.
789            Ok(_) => false,
790            // The semaphore has been closed, there is no further action to
791            // take.
792            Err(Closed) => false,
793            // The permit has been assigned. It must be acquired in order to
794            // be released back to the semaphore.
795            Err(Assigned) => {
796                match self.acquire2() {
797                    Ok(true) => true,
798                    // Not a reachable state
799                    Ok(false) => panic!(),
800                    // The semaphore has been closed, no further action to take.
801                    Err(_) => false,
802                }
803            }
804            Err(state) => panic!("unexpected state = {:?}", state),
805        }
806    }
807
808    /// Transition the state to `QueuedWaiting`.
809    ///
810    /// This step can only happen from `Queued` or from `Idle`.
811    ///
812    /// Returns `true` if transitioning into a queued state.
813    fn to_queued_waiting(&self) -> bool {
814        use self::NodeState::*;
815
816        let mut curr = NodeState::load(&self.state, Acquire);
817
818        loop {
819            debug_assert!(curr == Idle || curr == Queued, "actual = {:?}", curr);
820            let next = QueuedWaiting;
821
822            match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
823                Ok(_) => {
824                    if curr.is_queued() {
825                        return false;
826                    } else {
827                        // Transitioned to queued, reset next pointer
828                        self.next.store(ptr::null_mut(), Relaxed);
829                        return true;
830                    }
831                }
832                Err(actual) => {
833                    curr = actual;
834                }
835            }
836        }
837    }
838
839    /// Notify the waiter
840    ///
841    /// Returns `true` if the waiter accepts the notification
842    fn notify(&self, closed: bool) -> bool {
843        use self::NodeState::*;
844
845        // Assume QueuedWaiting state
846        let mut curr = QueuedWaiting;
847
848        loop {
849            let next = match curr {
850                Queued => Idle,
851                QueuedWaiting => {
852                    if closed {
853                        Closed
854                    } else {
855                        Assigned
856                    }
857                }
858                actual => panic!("actual = {:?}", actual),
859            };
860
861            match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
862                Ok(_) => match curr {
863                    QueuedWaiting => {
864                        debug!(" + notify -- task notified");
865                        self.task.notify();
866                        return true;
867                    }
868                    other => {
869                        debug!(" + notify -- not notified; state = {:?}", other);
870                        return false;
871                    }
872                },
873                Err(actual) => curr = actual,
874            }
875        }
876    }
877
878    fn revert_to_idle(&self) {
879        use self::NodeState::Idle;
880
881        // There are no other handles to the node
882        NodeState::store(&self.state, Idle, Relaxed);
883    }
884
885    fn into_non_null(arc: Arc<WaiterNode>) -> NonNull<WaiterNode> {
886        let ptr = Arc::into_raw(arc);
887        unsafe { NonNull::new_unchecked(ptr as *mut _) }
888    }
889}
890
891// ===== impl State =====
892
893/// Flag differentiating between available permits and waiter pointers.
894///
895/// If we assume pointers are properly aligned, then the least significant bit
896/// will always be zero. So, we use that bit to track if the value represents a
897/// number.
898const NUM_FLAG: usize = 0b01;
899
900const CLOSED_FLAG: usize = 0b10;
901
902const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT;
903
904/// When representing "numbers", the state has to be shifted this much (to get
905/// rid of the flag bit).
906const NUM_SHIFT: usize = 2;
907
908impl SemState {
909    /// Returns a new default `State` value.
910    fn new(permits: usize, stub: &WaiterNode) -> SemState {
911        assert!(permits <= MAX_PERMITS);
912
913        if permits > 0 {
914            SemState((permits << NUM_SHIFT) | NUM_FLAG)
915        } else {
916            SemState(stub as *const _ as usize)
917        }
918    }
919
920    /// Returns a `State` tracking `ptr` as the tail of the queue.
921    fn new_ptr(tail: NonNull<WaiterNode>, closed: bool) -> SemState {
922        let mut val = tail.as_ptr() as usize;
923
924        if closed {
925            val |= CLOSED_FLAG;
926        }
927
928        SemState(val)
929    }
930
931    /// Returns the amount of remaining capacity
932    fn available_permits(&self) -> usize {
933        if !self.has_available_permits() {
934            return 0;
935        }
936
937        self.0 >> NUM_SHIFT
938    }
939
940    /// Returns true if the state has permits that can be claimed by a waiter.
941    fn has_available_permits(&self) -> bool {
942        self.0 & NUM_FLAG == NUM_FLAG
943    }
944
945    fn has_waiter(&self, stub: &WaiterNode) -> bool {
946        !self.has_available_permits() && !self.is_stub(stub)
947    }
948
949    /// Try to acquire a permit
950    ///
951    /// # Return
952    ///
953    /// Returns `true` if the permit was acquired, `false` otherwise. If `false`
954    /// is returned, it can be assumed that `State` represents the head pointer
955    /// in the mpsc channel.
956    fn acquire_permit(&mut self, stub: &WaiterNode) -> bool {
957        if !self.has_available_permits() {
958            return false;
959        }
960
961        debug_assert!(self.waiter().is_none());
962
963        self.0 -= 1 << NUM_SHIFT;
964
965        if self.0 == NUM_FLAG {
966            // Set the state to the stub pointer.
967            self.0 = stub as *const _ as usize;
968        }
969
970        true
971    }
972
973    /// Release permits
974    ///
975    /// Returns `true` if the permits were accepted.
976    fn release_permits(&mut self, permits: usize, stub: &WaiterNode) {
977        debug_assert!(permits > 0);
978
979        if self.is_stub(stub) {
980            self.0 = (permits << NUM_SHIFT) | NUM_FLAG | (self.0 & CLOSED_FLAG);
981            return;
982        }
983
984        debug_assert!(self.has_available_permits());
985
986        self.0 += permits << NUM_SHIFT;
987    }
988
989    fn is_waiter(&self) -> bool {
990        self.0 & NUM_FLAG == 0
991    }
992
993    /// Returns the waiter, if one is set.
994    fn waiter(&self) -> Option<NonNull<WaiterNode>> {
995        if self.is_waiter() {
996            let waiter = NonNull::new(self.as_ptr()).expect("null pointer stored");
997
998            Some(waiter)
999        } else {
1000            None
1001        }
1002    }
1003
1004    /// Assumes `self` represents a pointer
1005    fn as_ptr(&self) -> *mut WaiterNode {
1006        (self.0 & !CLOSED_FLAG) as *mut WaiterNode
1007    }
1008
1009    /// Set to a pointer to a waiter.
1010    ///
1011    /// This can only be done from the full state.
1012    fn set_waiter(&mut self, waiter: NonNull<WaiterNode>) {
1013        let waiter = waiter.as_ptr() as usize;
1014        debug_assert!(waiter & NUM_FLAG == 0);
1015        debug_assert!(!self.is_closed());
1016
1017        self.0 = waiter;
1018    }
1019
1020    fn is_stub(&self, stub: &WaiterNode) -> bool {
1021        self.as_ptr() as usize == stub as *const _ as usize
1022    }
1023
1024    /// Load the state from an AtomicUsize.
1025    fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState {
1026        let value = cell.load(ordering);
1027        debug!(" + SemState::load; value = {}", value);
1028        SemState(value)
1029    }
1030
1031    /// Swap the values
1032    fn swap(&self, cell: &AtomicUsize, ordering: Ordering) -> SemState {
1033        let prev = SemState(cell.swap(self.to_usize(), ordering));
1034        debug_assert_eq!(prev.is_closed(), self.is_closed());
1035        prev
1036    }
1037
1038    /// Compare and exchange the current value into the provided cell
1039    fn compare_exchange(
1040        &self,
1041        cell: &AtomicUsize,
1042        prev: SemState,
1043        success: Ordering,
1044        failure: Ordering,
1045    ) -> Result<SemState, SemState> {
1046        debug_assert_eq!(prev.is_closed(), self.is_closed());
1047
1048        let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure);
1049
1050        debug!(
1051            " + SemState::compare_exchange; prev = {}; next = {}; result = {:?}",
1052            prev.to_usize(),
1053            self.to_usize(),
1054            res
1055        );
1056
1057        res.map(SemState).map_err(SemState)
1058    }
1059
1060    fn fetch_set_closed(cell: &AtomicUsize, ordering: Ordering) -> SemState {
1061        let value = cell.fetch_or(CLOSED_FLAG, ordering);
1062        SemState(value)
1063    }
1064
1065    fn is_closed(&self) -> bool {
1066        self.0 & CLOSED_FLAG == CLOSED_FLAG
1067    }
1068
1069    /// Converts the state into a `usize` representation.
1070    fn to_usize(&self) -> usize {
1071        self.0
1072    }
1073}
1074
1075impl fmt::Debug for SemState {
1076    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1077        let mut fmt = fmt.debug_struct("SemState");
1078
1079        if self.is_waiter() {
1080            fmt.field("state", &"<waiter>");
1081        } else {
1082            fmt.field("permits", &self.available_permits());
1083        }
1084
1085        fmt.finish()
1086    }
1087}
1088
1089// ===== impl NodeState =====
1090
1091impl NodeState {
1092    fn new() -> NodeState {
1093        NodeState::Idle
1094    }
1095
1096    fn from_usize(value: usize) -> NodeState {
1097        use self::NodeState::*;
1098
1099        match value {
1100            0 => Idle,
1101            1 => Queued,
1102            2 => QueuedWaiting,
1103            3 => Assigned,
1104            4 => Closed,
1105            _ => panic!(),
1106        }
1107    }
1108
1109    fn load(cell: &AtomicUsize, ordering: Ordering) -> NodeState {
1110        NodeState::from_usize(cell.load(ordering))
1111    }
1112
1113    /// Store a value
1114    fn store(cell: &AtomicUsize, value: NodeState, ordering: Ordering) {
1115        cell.store(value.to_usize(), ordering);
1116    }
1117
1118    fn compare_exchange(
1119        &self,
1120        cell: &AtomicUsize,
1121        prev: NodeState,
1122        success: Ordering,
1123        failure: Ordering,
1124    ) -> Result<NodeState, NodeState> {
1125        cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure)
1126            .map(NodeState::from_usize)
1127            .map_err(NodeState::from_usize)
1128    }
1129
1130    /// Returns `true` if `self` represents a queued state.
1131    fn is_queued(&self) -> bool {
1132        use self::NodeState::*;
1133
1134        match *self {
1135            Queued | QueuedWaiting => true,
1136            _ => false,
1137        }
1138    }
1139
1140    fn to_usize(&self) -> usize {
1141        *self as usize
1142    }
1143}