mio_extras/
channel.rs

1//! Thread safe communication channel implementing `Evented`
2use lazycell::{AtomicLazyCell, LazyCell};
3use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
4use std::any::Any;
5use std::error;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{mpsc, Arc};
8use std::{fmt, io};
9
10/// Creates a new asynchronous channel, where the `Receiver` can be registered
11/// with `Poll`.
12pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
13    let (tx_ctl, rx_ctl) = ctl_pair();
14    let (tx, rx) = mpsc::channel();
15
16    let tx = Sender { tx, ctl: tx_ctl };
17
18    let rx = Receiver { rx, ctl: rx_ctl };
19
20    (tx, rx)
21}
22
23/// Creates a new synchronous, bounded channel where the `Receiver` can be
24/// registered with `Poll`.
25pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
26    let (tx_ctl, rx_ctl) = ctl_pair();
27    let (tx, rx) = mpsc::sync_channel(bound);
28
29    let tx = SyncSender { tx, ctl: tx_ctl };
30
31    let rx = Receiver { rx, ctl: rx_ctl };
32
33    (tx, rx)
34}
35
36fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
37    let inner = Arc::new(Inner {
38        pending: AtomicUsize::new(0),
39        senders: AtomicUsize::new(1),
40        set_readiness: AtomicLazyCell::new(),
41    });
42
43    let tx = SenderCtl {
44        inner: Arc::clone(&inner),
45    };
46
47    let rx = ReceiverCtl {
48        registration: LazyCell::new(),
49        inner,
50    };
51
52    (tx, rx)
53}
54
55/// Tracks messages sent on a channel in order to update readiness.
56struct SenderCtl {
57    inner: Arc<Inner>,
58}
59
60/// Tracks messages received on a channel in order to track readiness.
61struct ReceiverCtl {
62    registration: LazyCell<Registration>,
63    inner: Arc<Inner>,
64}
65
66/// The sending half of a channel.
67pub struct Sender<T> {
68    tx: mpsc::Sender<T>,
69    ctl: SenderCtl,
70}
71
72/// The sending half of a synchronous channel.
73pub struct SyncSender<T> {
74    tx: mpsc::SyncSender<T>,
75    ctl: SenderCtl,
76}
77
78/// The receiving half of a channel.
79pub struct Receiver<T> {
80    rx: mpsc::Receiver<T>,
81    ctl: ReceiverCtl,
82}
83
84/// An error returned from the `Sender::send` or `SyncSender::send` function.
85pub enum SendError<T> {
86    /// An IO error.
87    Io(io::Error),
88
89    /// The receiving half of the channel has disconnected.
90    Disconnected(T),
91}
92
93/// An error returned from the `SyncSender::try_send` function.
94pub enum TrySendError<T> {
95    /// An IO error.
96    Io(io::Error),
97
98    /// Data could not be sent because it would require the callee to block.
99    Full(T),
100
101    /// The receiving half of the channel has disconnected.
102    Disconnected(T),
103}
104
105struct Inner {
106    // The number of outstanding messages for the receiver to read
107    pending: AtomicUsize,
108    // The number of sender handles
109    senders: AtomicUsize,
110    // The set readiness handle
111    set_readiness: AtomicLazyCell<SetReadiness>,
112}
113
114impl<T> Sender<T> {
115    /// Attempts to send a value on this channel, returning it back if it could not be sent.
116    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
117        self.tx.send(t).map_err(SendError::from).and_then(|_| {
118            self.ctl.inc()?;
119            Ok(())
120        })
121    }
122}
123
124impl<T> Clone for Sender<T> {
125    fn clone(&self) -> Sender<T> {
126        Sender {
127            tx: self.tx.clone(),
128            ctl: self.ctl.clone(),
129        }
130    }
131}
132
133impl<T> SyncSender<T> {
134    /// Sends a value on this synchronous channel.
135    ///
136    /// This function will *block* until space in the internal buffer becomes
137    /// available or a receiver is available to hand off the message to.
138    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
139        self.tx.send(t).map_err(From::from).and_then(|_| {
140            self.ctl.inc()?;
141            Ok(())
142        })
143    }
144
145    /// Attempts to send a value on this channel without blocking.
146    ///
147    /// This method differs from `send` by returning immediately if the channel's
148    /// buffer is full or no receiver is waiting to acquire some data.
149    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
150        self.tx.try_send(t).map_err(From::from).and_then(|_| {
151            self.ctl.inc()?;
152            Ok(())
153        })
154    }
155}
156
157impl<T> Clone for SyncSender<T> {
158    fn clone(&self) -> SyncSender<T> {
159        SyncSender {
160            tx: self.tx.clone(),
161            ctl: self.ctl.clone(),
162        }
163    }
164}
165
166impl<T> Receiver<T> {
167    /// Attempts to return a pending value on this receiver without blocking.
168    pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
169        self.rx.try_recv().and_then(|res| {
170            let _ = self.ctl.dec();
171            Ok(res)
172        })
173    }
174}
175
176impl<T> Evented for Receiver<T> {
177    fn register(
178        &self,
179        poll: &Poll,
180        token: Token,
181        interest: Ready,
182        opts: PollOpt,
183    ) -> io::Result<()> {
184        self.ctl.register(poll, token, interest, opts)
185    }
186
187    fn reregister(
188        &self,
189        poll: &Poll,
190        token: Token,
191        interest: Ready,
192        opts: PollOpt,
193    ) -> io::Result<()> {
194        self.ctl.reregister(poll, token, interest, opts)
195    }
196
197    fn deregister(&self, poll: &Poll) -> io::Result<()> {
198        self.ctl.deregister(poll)
199    }
200}
201
202/*
203 *
204 * ===== SenderCtl / ReceiverCtl =====
205 *
206 */
207
208impl SenderCtl {
209    /// Call to track that a message has been sent
210    fn inc(&self) -> io::Result<()> {
211        let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
212
213        if 0 == cnt {
214            // Toggle readiness to readable
215            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
216                set_readiness.set_readiness(Ready::readable())?;
217            }
218        }
219
220        Ok(())
221    }
222}
223
224impl Clone for SenderCtl {
225    fn clone(&self) -> SenderCtl {
226        self.inner.senders.fetch_add(1, Ordering::Relaxed);
227        SenderCtl {
228            inner: Arc::clone(&self.inner),
229        }
230    }
231}
232
233impl Drop for SenderCtl {
234    fn drop(&mut self) {
235        if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
236            let _ = self.inc();
237        }
238    }
239}
240
241impl ReceiverCtl {
242    fn dec(&self) -> io::Result<()> {
243        let first = self.inner.pending.load(Ordering::Acquire);
244
245        if first == 1 {
246            // Unset readiness
247            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
248                set_readiness.set_readiness(Ready::empty())?;
249            }
250        }
251
252        // Decrement
253        let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
254
255        if first == 1 && second > 1 {
256            // There are still pending messages. Since readiness was
257            // previously unset, it must be reset here
258            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
259                set_readiness.set_readiness(Ready::readable())?;
260            }
261        }
262
263        Ok(())
264    }
265}
266
267impl Evented for ReceiverCtl {
268    fn register(
269        &self,
270        poll: &Poll,
271        token: Token,
272        interest: Ready,
273        opts: PollOpt,
274    ) -> io::Result<()> {
275        if self.registration.borrow().is_some() {
276            return Err(io::Error::new(
277                io::ErrorKind::Other,
278                "receiver already registered",
279            ));
280        }
281
282        let (registration, set_readiness) = Registration::new2();
283        poll.register(&registration, token, interest, opts)?;
284
285        if self.inner.pending.load(Ordering::Relaxed) > 0 {
286            // TODO: Don't drop readiness
287            let _ = set_readiness.set_readiness(Ready::readable());
288        }
289
290        self.registration
291            .fill(registration)
292            .expect("unexpected state encountered");
293        self.inner
294            .set_readiness
295            .fill(set_readiness)
296            .expect("unexpected state encountered");
297
298        Ok(())
299    }
300
301    fn reregister(
302        &self,
303        poll: &Poll,
304        token: Token,
305        interest: Ready,
306        opts: PollOpt,
307    ) -> io::Result<()> {
308        match self.registration.borrow() {
309            Some(registration) => poll.reregister(registration, token, interest, opts),
310            None => Err(io::Error::new(
311                io::ErrorKind::Other,
312                "receiver not registered",
313            )),
314        }
315    }
316
317    fn deregister(&self, poll: &Poll) -> io::Result<()> {
318        match self.registration.borrow() {
319            Some(registration) => poll.deregister(registration),
320            None => Err(io::Error::new(
321                io::ErrorKind::Other,
322                "receiver not registered",
323            )),
324        }
325    }
326}
327
328/*
329 *
330 * ===== Error conversions =====
331 *
332 */
333
334impl<T> From<mpsc::SendError<T>> for SendError<T> {
335    fn from(src: mpsc::SendError<T>) -> SendError<T> {
336        SendError::Disconnected(src.0)
337    }
338}
339
340impl<T> From<io::Error> for SendError<T> {
341    fn from(src: io::Error) -> SendError<T> {
342        SendError::Io(src)
343    }
344}
345
346impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
347    fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
348        match src {
349            mpsc::TrySendError::Full(v) => TrySendError::Full(v),
350            mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
351        }
352    }
353}
354
355impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
356    fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
357        TrySendError::Disconnected(src.0)
358    }
359}
360
361impl<T> From<io::Error> for TrySendError<T> {
362    fn from(src: io::Error) -> TrySendError<T> {
363        TrySendError::Io(src)
364    }
365}
366
367/*
368 *
369 * ===== Implement Error, Debug and Display for Errors =====
370 *
371 */
372
373impl<T: Any> error::Error for SendError<T> {
374    fn description(&self) -> &str {
375        match *self {
376            SendError::Io(ref io_err) => io_err.description(),
377            SendError::Disconnected(..) => "Disconnected",
378        }
379    }
380}
381
382impl<T: Any> error::Error for TrySendError<T> {
383    fn description(&self) -> &str {
384        match *self {
385            TrySendError::Io(ref io_err) => io_err.description(),
386            TrySendError::Full(..) => "Full",
387            TrySendError::Disconnected(..) => "Disconnected",
388        }
389    }
390}
391
392impl<T> fmt::Debug for SendError<T> {
393    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
394        format_send_error(self, f)
395    }
396}
397
398impl<T> fmt::Display for SendError<T> {
399    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
400        format_send_error(self, f)
401    }
402}
403
404impl<T> fmt::Debug for TrySendError<T> {
405    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
406        format_try_send_error(self, f)
407    }
408}
409
410impl<T> fmt::Display for TrySendError<T> {
411    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
412        format_try_send_error(self, f)
413    }
414}
415
416#[inline]
417fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
418    match *e {
419        SendError::Io(ref io_err) => write!(f, "{}", io_err),
420        SendError::Disconnected(..) => write!(f, "Disconnected"),
421    }
422}
423
424#[inline]
425fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
426    match *e {
427        TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
428        TrySendError::Full(..) => write!(f, "Full"),
429        TrySendError::Disconnected(..) => write!(f, "Disconnected"),
430    }
431}