Crate diatomic_waker
source ·Expand description
Async, fast synchronization primitives for task wakeup.
diatomic-waker
is similar to atomic-waker
in that it
enables concurrent updates and notifications to a wrapped Waker
. Unlike
the latter, however, it does not use spinlocks1 and is faster, in
particular when the consumer is notified periodically rather than just once.
It can in particular be used as a very fast, single-consumer eventcount to
turn a non-blocking data structure into an asynchronous one (see MPSC
channel receiver example).
The API distinguishes between the entity that registers wakers (WakeSink
or WakeSinkRef
) and the possibly many entities that notify the waker
(WakeSource
s or WakeSourceRef
s).
Most users will prefer to use WakeSink
and WakeSource
, which readily
store a shared DiatomicWaker
within an Arc
. You may otherwise elect to
allocate a DiatomicWaker
yourself, but will then need to use the
lifetime-bounded WakeSinkRef
and WakeSourceRef
, or ensure by other
means that waker registration is not performed concurrently.
§Features flags
By default, this crate enables the alloc
feature to provide the owned
WakeSink
and WakeSource
. It can be made no-std
-compatible by
specifying default-features = false
.
§Examples
A multi-producer, single-consumer channel of capacity 1 for sending
NonZeroUsize
values, with an asynchronous receiver:
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use diatomic_waker::{WakeSink, WakeSource};
// The sending side of the channel.
#[derive(Clone)]
struct Sender {
wake_src: WakeSource,
value: Arc<AtomicUsize>,
}
// The receiving side of the channel.
struct Receiver {
wake_sink: WakeSink,
value: Arc<AtomicUsize>,
}
// Creates an empty channel.
fn channel() -> (Sender, Receiver) {
let value = Arc::new(AtomicUsize::new(0));
let wake_sink = WakeSink::new();
let wake_src = wake_sink.source();
(
Sender {
wake_src,
value: value.clone(),
},
Receiver { wake_sink, value },
)
}
impl Sender {
// Sends a value if the channel is empty.
fn try_send(&self, value: NonZeroUsize) -> bool {
let success = self
.value
.compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed)
.is_ok();
if success {
self.wake_src.notify()
};
success
}
}
impl Receiver {
// Receives a value asynchronously.
async fn recv(&mut self) -> NonZeroUsize {
// Wait until the predicate returns `Some(value)`, i.e. when the atomic
// value becomes non-zero.
self.wake_sink
.wait_until(|| NonZeroUsize::new(self.value.swap(0, Ordering::Relaxed)))
.await
}
}
In some case, it may be necessary to use the lower-level
register
and unregister
methods rather than the wait_until
convenience
method.
This is how the behavior of the above recv
method could be
reproduced with a hand-coded future:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct Recv<'a> {
receiver: &'a mut Receiver,
}
impl Future for Recv<'_> {
type Output = NonZeroUsize;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<NonZeroUsize> {
// Avoid waker registration if a value is readily available.
let value = NonZeroUsize::new(self.receiver.value.swap(0, Ordering::Relaxed));
if let Some(value) = value {
return Poll::Ready(value);
}
// Register the waker to be polled again once a value is available.
self.receiver.wake_sink.register(cx.waker());
// Check again after registering the waker to prevent a race condition.
let value = NonZeroUsize::new(self.receiver.value.swap(0, Ordering::Relaxed));
if let Some(value) = value {
// Avoid a spurious wake-up.
self.receiver.wake_sink.unregister();
return Poll::Ready(value);
}
Poll::Pending
}
}
The implementation of AtomicWaker yields to the runtime on contention, which is in effect an executor-mediated spinlock. ↩
Modules§
- primitives
Deprecated Primitives for task wakeup.
Structs§
- A primitive that can send or await notifications.
- A future that can be
await
ed until a predicate is satisfied. - Wake
Sink alloc
An owned object that can await notifications from one or severalWakeSource
s. - A non-owned object that can await notifications from one or several
WakeSourceRef
s. - Wake
Source alloc
An owned object that can send notifications to aWakeSink
. - A non-owned object that can send notifications to a
WakeSinkRef
.