tokio_sync/mpsc/
unbounded.rs1use super::chan;
2
3use futures::{Poll, Sink, StartSend, Stream};
4use loom::sync::atomic::AtomicUsize;
5
6use std::fmt;
7
8pub struct UnboundedSender<T> {
13 chan: chan::Tx<T, Semaphore>,
14}
15
16impl<T> Clone for UnboundedSender<T> {
17 fn clone(&self) -> Self {
18 UnboundedSender {
19 chan: self.chan.clone(),
20 }
21 }
22}
23
24impl<T> fmt::Debug for UnboundedSender<T> {
25 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
26 fmt.debug_struct("UnboundedSender")
27 .field("chan", &self.chan)
28 .finish()
29 }
30}
31
32pub struct UnboundedReceiver<T> {
37 chan: chan::Rx<T, Semaphore>,
39}
40
41impl<T> fmt::Debug for UnboundedReceiver<T> {
42 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
43 fmt.debug_struct("UnboundedReceiver")
44 .field("chan", &self.chan)
45 .finish()
46 }
47}
48
49#[derive(Debug)]
51pub struct UnboundedSendError(());
52
53#[derive(Debug)]
55pub struct UnboundedTrySendError<T>(T);
56
57#[derive(Debug)]
59pub struct UnboundedRecvError(());
60
61pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
72 let (tx, rx) = chan::channel(AtomicUsize::new(0));
73
74 let tx = UnboundedSender::new(tx);
75 let rx = UnboundedReceiver::new(rx);
76
77 (tx, rx)
78}
79
80type Semaphore = AtomicUsize;
82
83impl<T> UnboundedReceiver<T> {
84 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
85 UnboundedReceiver { chan }
86 }
87
88 pub fn close(&mut self) {
93 self.chan.close();
94 }
95}
96
97impl<T> Stream for UnboundedReceiver<T> {
98 type Item = T;
99 type Error = UnboundedRecvError;
100
101 fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
102 self.chan.recv().map_err(|_| UnboundedRecvError(()))
103 }
104}
105
106impl<T> UnboundedSender<T> {
107 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
108 UnboundedSender { chan }
109 }
110
111 pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError<T>> {
113 self.chan.try_send(message)?;
114 Ok(())
115 }
116}
117
118impl<T> Sink for UnboundedSender<T> {
119 type SinkItem = T;
120 type SinkError = UnboundedSendError;
121
122 fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
123 use futures::AsyncSink;
124
125 self.try_send(msg).map_err(|_| UnboundedSendError(()))?;
126 Ok(AsyncSink::Ready)
127 }
128
129 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
130 use futures::Async::Ready;
131 Ok(Ready(()))
132 }
133
134 fn close(&mut self) -> Poll<(), Self::SinkError> {
135 use futures::Async::Ready;
136 Ok(Ready(()))
137 }
138}
139
140impl fmt::Display for UnboundedSendError {
143 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
144 use std::error::Error;
145 write!(fmt, "{}", self.description())
146 }
147}
148
149impl ::std::error::Error for UnboundedSendError {
150 fn description(&self) -> &str {
151 "channel closed"
152 }
153}
154
155impl<T> UnboundedTrySendError<T> {
158 pub fn into_inner(self) -> T {
160 self.0
161 }
162}
163
164impl<T: fmt::Debug> fmt::Display for UnboundedTrySendError<T> {
165 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
166 use std::error::Error;
167 write!(fmt, "{}", self.description())
168 }
169}
170
171impl<T: fmt::Debug> ::std::error::Error for UnboundedTrySendError<T> {
172 fn description(&self) -> &str {
173 "channel closed"
174 }
175}
176
177impl<T> From<(T, chan::TrySendError)> for UnboundedTrySendError<T> {
178 fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError<T> {
179 assert_eq!(chan::TrySendError::Closed, err);
180 UnboundedTrySendError(value)
181 }
182}
183
184impl fmt::Display for UnboundedRecvError {
187 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
188 use std::error::Error;
189 write!(fmt, "{}", self.description())
190 }
191}
192
193impl ::std::error::Error for UnboundedRecvError {
194 fn description(&self) -> &str {
195 "channel closed"
196 }
197}