1use lazycell::{AtomicLazyCell, LazyCell};
3use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
4use std::any::Any;
5use std::error;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{mpsc, Arc};
8use std::{fmt, io};
9
10pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
13 let (tx_ctl, rx_ctl) = ctl_pair();
14 let (tx, rx) = mpsc::channel();
15
16 let tx = Sender { tx, ctl: tx_ctl };
17
18 let rx = Receiver { rx, ctl: rx_ctl };
19
20 (tx, rx)
21}
22
23pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
26 let (tx_ctl, rx_ctl) = ctl_pair();
27 let (tx, rx) = mpsc::sync_channel(bound);
28
29 let tx = SyncSender { tx, ctl: tx_ctl };
30
31 let rx = Receiver { rx, ctl: rx_ctl };
32
33 (tx, rx)
34}
35
36fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
37 let inner = Arc::new(Inner {
38 pending: AtomicUsize::new(0),
39 senders: AtomicUsize::new(1),
40 set_readiness: AtomicLazyCell::new(),
41 });
42
43 let tx = SenderCtl {
44 inner: Arc::clone(&inner),
45 };
46
47 let rx = ReceiverCtl {
48 registration: LazyCell::new(),
49 inner,
50 };
51
52 (tx, rx)
53}
54
55struct SenderCtl {
57 inner: Arc<Inner>,
58}
59
60struct ReceiverCtl {
62 registration: LazyCell<Registration>,
63 inner: Arc<Inner>,
64}
65
66pub struct Sender<T> {
68 tx: mpsc::Sender<T>,
69 ctl: SenderCtl,
70}
71
72pub struct SyncSender<T> {
74 tx: mpsc::SyncSender<T>,
75 ctl: SenderCtl,
76}
77
78pub struct Receiver<T> {
80 rx: mpsc::Receiver<T>,
81 ctl: ReceiverCtl,
82}
83
84pub enum SendError<T> {
86 Io(io::Error),
88
89 Disconnected(T),
91}
92
93pub enum TrySendError<T> {
95 Io(io::Error),
97
98 Full(T),
100
101 Disconnected(T),
103}
104
105struct Inner {
106 pending: AtomicUsize,
108 senders: AtomicUsize,
110 set_readiness: AtomicLazyCell<SetReadiness>,
112}
113
114impl<T> Sender<T> {
115 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
117 self.tx.send(t).map_err(SendError::from).and_then(|_| {
118 self.ctl.inc()?;
119 Ok(())
120 })
121 }
122}
123
124impl<T> Clone for Sender<T> {
125 fn clone(&self) -> Sender<T> {
126 Sender {
127 tx: self.tx.clone(),
128 ctl: self.ctl.clone(),
129 }
130 }
131}
132
133impl<T> SyncSender<T> {
134 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
139 self.tx.send(t).map_err(From::from).and_then(|_| {
140 self.ctl.inc()?;
141 Ok(())
142 })
143 }
144
145 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
150 self.tx.try_send(t).map_err(From::from).and_then(|_| {
151 self.ctl.inc()?;
152 Ok(())
153 })
154 }
155}
156
157impl<T> Clone for SyncSender<T> {
158 fn clone(&self) -> SyncSender<T> {
159 SyncSender {
160 tx: self.tx.clone(),
161 ctl: self.ctl.clone(),
162 }
163 }
164}
165
166impl<T> Receiver<T> {
167 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
169 self.rx.try_recv().and_then(|res| {
170 let _ = self.ctl.dec();
171 Ok(res)
172 })
173 }
174}
175
176impl<T> Evented for Receiver<T> {
177 fn register(
178 &self,
179 poll: &Poll,
180 token: Token,
181 interest: Ready,
182 opts: PollOpt,
183 ) -> io::Result<()> {
184 self.ctl.register(poll, token, interest, opts)
185 }
186
187 fn reregister(
188 &self,
189 poll: &Poll,
190 token: Token,
191 interest: Ready,
192 opts: PollOpt,
193 ) -> io::Result<()> {
194 self.ctl.reregister(poll, token, interest, opts)
195 }
196
197 fn deregister(&self, poll: &Poll) -> io::Result<()> {
198 self.ctl.deregister(poll)
199 }
200}
201
202impl SenderCtl {
209 fn inc(&self) -> io::Result<()> {
211 let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
212
213 if 0 == cnt {
214 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
216 set_readiness.set_readiness(Ready::readable())?;
217 }
218 }
219
220 Ok(())
221 }
222}
223
224impl Clone for SenderCtl {
225 fn clone(&self) -> SenderCtl {
226 self.inner.senders.fetch_add(1, Ordering::Relaxed);
227 SenderCtl {
228 inner: Arc::clone(&self.inner),
229 }
230 }
231}
232
233impl Drop for SenderCtl {
234 fn drop(&mut self) {
235 if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
236 let _ = self.inc();
237 }
238 }
239}
240
241impl ReceiverCtl {
242 fn dec(&self) -> io::Result<()> {
243 let first = self.inner.pending.load(Ordering::Acquire);
244
245 if first == 1 {
246 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
248 set_readiness.set_readiness(Ready::empty())?;
249 }
250 }
251
252 let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
254
255 if first == 1 && second > 1 {
256 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
259 set_readiness.set_readiness(Ready::readable())?;
260 }
261 }
262
263 Ok(())
264 }
265}
266
267impl Evented for ReceiverCtl {
268 fn register(
269 &self,
270 poll: &Poll,
271 token: Token,
272 interest: Ready,
273 opts: PollOpt,
274 ) -> io::Result<()> {
275 if self.registration.borrow().is_some() {
276 return Err(io::Error::new(
277 io::ErrorKind::Other,
278 "receiver already registered",
279 ));
280 }
281
282 let (registration, set_readiness) = Registration::new2();
283 poll.register(®istration, token, interest, opts)?;
284
285 if self.inner.pending.load(Ordering::Relaxed) > 0 {
286 let _ = set_readiness.set_readiness(Ready::readable());
288 }
289
290 self.registration
291 .fill(registration)
292 .expect("unexpected state encountered");
293 self.inner
294 .set_readiness
295 .fill(set_readiness)
296 .expect("unexpected state encountered");
297
298 Ok(())
299 }
300
301 fn reregister(
302 &self,
303 poll: &Poll,
304 token: Token,
305 interest: Ready,
306 opts: PollOpt,
307 ) -> io::Result<()> {
308 match self.registration.borrow() {
309 Some(registration) => poll.reregister(registration, token, interest, opts),
310 None => Err(io::Error::new(
311 io::ErrorKind::Other,
312 "receiver not registered",
313 )),
314 }
315 }
316
317 fn deregister(&self, poll: &Poll) -> io::Result<()> {
318 match self.registration.borrow() {
319 Some(registration) => poll.deregister(registration),
320 None => Err(io::Error::new(
321 io::ErrorKind::Other,
322 "receiver not registered",
323 )),
324 }
325 }
326}
327
328impl<T> From<mpsc::SendError<T>> for SendError<T> {
335 fn from(src: mpsc::SendError<T>) -> SendError<T> {
336 SendError::Disconnected(src.0)
337 }
338}
339
340impl<T> From<io::Error> for SendError<T> {
341 fn from(src: io::Error) -> SendError<T> {
342 SendError::Io(src)
343 }
344}
345
346impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
347 fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
348 match src {
349 mpsc::TrySendError::Full(v) => TrySendError::Full(v),
350 mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
351 }
352 }
353}
354
355impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
356 fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
357 TrySendError::Disconnected(src.0)
358 }
359}
360
361impl<T> From<io::Error> for TrySendError<T> {
362 fn from(src: io::Error) -> TrySendError<T> {
363 TrySendError::Io(src)
364 }
365}
366
367impl<T: Any> error::Error for SendError<T> {
374 fn description(&self) -> &str {
375 match *self {
376 SendError::Io(ref io_err) => io_err.description(),
377 SendError::Disconnected(..) => "Disconnected",
378 }
379 }
380}
381
382impl<T: Any> error::Error for TrySendError<T> {
383 fn description(&self) -> &str {
384 match *self {
385 TrySendError::Io(ref io_err) => io_err.description(),
386 TrySendError::Full(..) => "Full",
387 TrySendError::Disconnected(..) => "Disconnected",
388 }
389 }
390}
391
392impl<T> fmt::Debug for SendError<T> {
393 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
394 format_send_error(self, f)
395 }
396}
397
398impl<T> fmt::Display for SendError<T> {
399 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
400 format_send_error(self, f)
401 }
402}
403
404impl<T> fmt::Debug for TrySendError<T> {
405 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
406 format_try_send_error(self, f)
407 }
408}
409
410impl<T> fmt::Display for TrySendError<T> {
411 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
412 format_try_send_error(self, f)
413 }
414}
415
416#[inline]
417fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
418 match *e {
419 SendError::Io(ref io_err) => write!(f, "{}", io_err),
420 SendError::Disconnected(..) => write!(f, "Disconnected"),
421 }
422}
423
424#[inline]
425fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
426 match *e {
427 TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
428 TrySendError::Full(..) => write!(f, "Full"),
429 TrySendError::Disconnected(..) => write!(f, "Disconnected"),
430 }
431}