1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use super::{
    chan::{self, SendError, TryRecvError},
    semaphore::Unlimited,
};
use futures_util::future::poll_fn;
use std::task::{Context, Poll};

#[derive(Clone)]
pub struct Tx<T>(chan::Tx<T, Unlimited>);

pub struct Rx<T>(chan::Rx<T, Unlimited>);

pub fn channel<T>() -> (Tx<T>, Rx<T>) {
    let semaphore = Unlimited::new();
    let (tx, rx) = chan::channel(semaphore);
    (Tx(tx), Rx(rx))
}

impl<T> Tx<T> {
    pub fn send(&self, value: T) -> Result<(), SendError> {
        self.0.send(value)
    }

    pub fn is_closed(&self) -> bool {
        self.0.is_closed()
    }

    pub fn same_channel(&self, other: &Self) -> bool {
        self.0.same_channel(&other.0)
    }
}

impl<T> Rx<T> {
    pub async fn recv(&mut self) -> Option<T> {
        poll_fn(|cx| self.poll_recv(cx)).await
    }

    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        self.0.recv(cx)
    }

    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
        self.0.try_recv()
    }

    pub fn close(&mut self) {
        self.0.close()
    }
}

#[cfg(test)]
mod tests {
    use super::channel;

    #[monoio::test]
    async fn tets_unbounded_channel() {
        let (tx, mut rx) = channel();
        tx.send(1).unwrap();
        assert_eq!(rx.recv().await.unwrap(), 1);

        drop(tx);
        assert_eq!(rx.recv().await, None);
    }
}