ntex_util/channel/
oneshot.rsuse std::{future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll};
use super::{cell::Cell, Canceled};
use crate::task::LocalWaker;
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Cell::new(Inner {
value: None,
rx_task: LocalWaker::new(),
});
let tx = Sender {
inner: inner.clone(),
};
let rx = Receiver { inner };
(tx, rx)
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Cell<Inner<T>>,
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Receiver<T> {
inner: Cell<Inner<T>>,
}
impl<T> Unpin for Receiver<T> {}
impl<T> Unpin for Sender<T> {}
#[derive(Debug)]
struct Inner<T> {
value: Option<T>,
rx_task: LocalWaker,
}
impl<T> Sender<T> {
pub fn send(self, val: T) -> Result<(), T> {
if self.inner.strong_count() == 2 {
let inner = self.inner.get_mut();
inner.value = Some(val);
inner.rx_task.wake();
Ok(())
} else {
Err(val)
}
}
pub fn is_canceled(&self) -> bool {
self.inner.strong_count() == 1
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.inner.get_ref().rx_task.wake();
}
}
impl<T> Receiver<T> {
pub async fn recv(&self) -> Result<T, Canceled> {
poll_fn(|cx| self.poll_recv(cx)).await
}
pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
if let Some(val) = self.inner.get_mut().value.take() {
return Poll::Ready(Ok(val));
}
if self.inner.strong_count() == 1 {
Poll::Ready(Err(Canceled))
} else {
self.inner.get_ref().rx_task.register(cx.waker());
Poll::Pending
}
}
}
impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_recv(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::future::lazy;
#[ntex_macros::rt_test2]
async fn test_oneshot() {
let (tx, rx) = channel();
assert!(format!("{:?}", tx).contains("Sender"));
assert!(format!("{:?}", rx).contains("Receiver"));
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, rx) = channel();
tx.send("test").unwrap();
assert_eq!(rx.recv().await.unwrap(), "test");
let (tx, rx) = channel();
assert!(!tx.is_canceled());
drop(rx);
assert!(tx.is_canceled());
assert!(tx.send("test").is_err());
let (tx, rx) = channel::<&'static str>();
drop(tx);
assert!(rx.await.is_err());
let (tx, mut rx) = channel::<&'static str>();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, mut rx) = channel::<&'static str>();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
drop(tx);
assert!(rx.await.is_err());
}
}