tokio_sync/mpsc/
unbounded.rs

1use super::chan;
2
3use futures::{Poll, Sink, StartSend, Stream};
4use loom::sync::atomic::AtomicUsize;
5
6use std::fmt;
7
8/// Send values to the associated `UnboundedReceiver`.
9///
10/// Instances are created by the
11/// [`unbounded_channel`](fn.unbounded_channel.html) function.
12pub struct UnboundedSender<T> {
13    chan: chan::Tx<T, Semaphore>,
14}
15
16impl<T> Clone for UnboundedSender<T> {
17    fn clone(&self) -> Self {
18        UnboundedSender {
19            chan: self.chan.clone(),
20        }
21    }
22}
23
24impl<T> fmt::Debug for UnboundedSender<T> {
25    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
26        fmt.debug_struct("UnboundedSender")
27            .field("chan", &self.chan)
28            .finish()
29    }
30}
31
32/// Receive values from the associated `UnboundedSender`.
33///
34/// Instances are created by the
35/// [`unbounded_channel`](fn.unbounded_channel.html) function.
36pub struct UnboundedReceiver<T> {
37    /// The channel receiver
38    chan: chan::Rx<T, Semaphore>,
39}
40
41impl<T> fmt::Debug for UnboundedReceiver<T> {
42    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
43        fmt.debug_struct("UnboundedReceiver")
44            .field("chan", &self.chan)
45            .finish()
46    }
47}
48
49/// Error returned by the `UnboundedSender`.
50#[derive(Debug)]
51pub struct UnboundedSendError(());
52
53/// Returned by `UnboundedSender::try_send` when the channel has been closed.
54#[derive(Debug)]
55pub struct UnboundedTrySendError<T>(T);
56
57/// Error returned by `UnboundedReceiver`.
58#[derive(Debug)]
59pub struct UnboundedRecvError(());
60
61/// Create an unbounded mpsc channel for communicating between asynchronous
62/// tasks.
63///
64/// A `send` on this channel will always succeed as long as the receive half has
65/// not been closed. If the receiver falls behind, messages will be arbitrarily
66/// buffered.
67///
68/// **Note** that the amount of available system memory is an implicit bound to
69/// the channel. Using an `unbounded` channel has the ability of causing the
70/// process to run out of memory. In this case, the process will be aborted.
71pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
72    let (tx, rx) = chan::channel(AtomicUsize::new(0));
73
74    let tx = UnboundedSender::new(tx);
75    let rx = UnboundedReceiver::new(rx);
76
77    (tx, rx)
78}
79
80/// No capacity
81type Semaphore = AtomicUsize;
82
83impl<T> UnboundedReceiver<T> {
84    pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
85        UnboundedReceiver { chan }
86    }
87
88    /// Closes the receiving half of a channel, without dropping it.
89    ///
90    /// This prevents any further messages from being sent on the channel while
91    /// still enabling the receiver to drain messages that are buffered.
92    pub fn close(&mut self) {
93        self.chan.close();
94    }
95}
96
97impl<T> Stream for UnboundedReceiver<T> {
98    type Item = T;
99    type Error = UnboundedRecvError;
100
101    fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
102        self.chan.recv().map_err(|_| UnboundedRecvError(()))
103    }
104}
105
106impl<T> UnboundedSender<T> {
107    pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
108        UnboundedSender { chan }
109    }
110
111    /// Attempts to send a message on this `UnboundedSender` without blocking.
112    pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError<T>> {
113        self.chan.try_send(message)?;
114        Ok(())
115    }
116}
117
118impl<T> Sink for UnboundedSender<T> {
119    type SinkItem = T;
120    type SinkError = UnboundedSendError;
121
122    fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
123        use futures::AsyncSink;
124
125        self.try_send(msg).map_err(|_| UnboundedSendError(()))?;
126        Ok(AsyncSink::Ready)
127    }
128
129    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
130        use futures::Async::Ready;
131        Ok(Ready(()))
132    }
133
134    fn close(&mut self) -> Poll<(), Self::SinkError> {
135        use futures::Async::Ready;
136        Ok(Ready(()))
137    }
138}
139
140// ===== impl UnboundedSendError =====
141
142impl fmt::Display for UnboundedSendError {
143    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
144        use std::error::Error;
145        write!(fmt, "{}", self.description())
146    }
147}
148
149impl ::std::error::Error for UnboundedSendError {
150    fn description(&self) -> &str {
151        "channel closed"
152    }
153}
154
155// ===== impl TrySendError =====
156
157impl<T> UnboundedTrySendError<T> {
158    /// Get the inner value.
159    pub fn into_inner(self) -> T {
160        self.0
161    }
162}
163
164impl<T: fmt::Debug> fmt::Display for UnboundedTrySendError<T> {
165    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
166        use std::error::Error;
167        write!(fmt, "{}", self.description())
168    }
169}
170
171impl<T: fmt::Debug> ::std::error::Error for UnboundedTrySendError<T> {
172    fn description(&self) -> &str {
173        "channel closed"
174    }
175}
176
177impl<T> From<(T, chan::TrySendError)> for UnboundedTrySendError<T> {
178    fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError<T> {
179        assert_eq!(chan::TrySendError::Closed, err);
180        UnboundedTrySendError(value)
181    }
182}
183
184// ===== impl UnboundedRecvError =====
185
186impl fmt::Display for UnboundedRecvError {
187    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
188        use std::error::Error;
189        write!(fmt, "{}", self.description())
190    }
191}
192
193impl ::std::error::Error for UnboundedRecvError {
194    fn description(&self) -> &str {
195        "channel closed"
196    }
197}