tokio_sync/mpsc/
bounded.rs

1use super::chan;
2
3use futures::{Poll, Sink, StartSend, Stream};
4
5use std::fmt;
6
7/// Send values to the associated `Receiver`.
8///
9/// Instances are created by the [`channel`](fn.channel.html) function.
10pub struct Sender<T> {
11    chan: chan::Tx<T, Semaphore>,
12}
13
14impl<T> Clone for Sender<T> {
15    fn clone(&self) -> Self {
16        Sender {
17            chan: self.chan.clone(),
18        }
19    }
20}
21
22impl<T> fmt::Debug for Sender<T> {
23    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
24        fmt.debug_struct("Sender")
25            .field("chan", &self.chan)
26            .finish()
27    }
28}
29
30/// Receive values from the associated `Sender`.
31///
32/// Instances are created by the [`channel`](fn.channel.html) function.
33pub struct Receiver<T> {
34    /// The channel receiver
35    chan: chan::Rx<T, Semaphore>,
36}
37
38impl<T> fmt::Debug for Receiver<T> {
39    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
40        fmt.debug_struct("Receiver")
41            .field("chan", &self.chan)
42            .finish()
43    }
44}
45
46/// Error returned by the `Sender`.
47#[derive(Debug)]
48pub struct SendError(());
49
50/// Error returned by `Sender::try_send`.
51#[derive(Debug)]
52pub struct TrySendError<T> {
53    kind: ErrorKind,
54    value: T,
55}
56
57#[derive(Debug)]
58enum ErrorKind {
59    Closed,
60    NoCapacity,
61}
62
63/// Error returned by `Receiver`.
64#[derive(Debug)]
65pub struct RecvError(());
66
67/// Create a bounded mpsc channel for communicating between asynchronous tasks,
68/// returning the sender/receiver halves.
69///
70/// All data sent on `Sender` will become available on `Receiver` in the same
71/// order as it was sent.
72///
73/// The `Sender` can be cloned to `send` to the same channel from multiple code
74/// locations. Only one `Receiver` is supported.
75///
76/// If the `Receiver` is disconnected while trying to `send`, the `send` method
77/// will return a `SendError`. Similarly, if `Sender` is disconnected while
78/// trying to `recv`, the `recv` method will return a `RecvError`.
79///
80/// # Examples
81///
82/// ```rust
83/// extern crate futures;
84/// extern crate tokio;
85///
86/// use tokio::sync::mpsc::channel;
87/// use tokio::prelude::*;
88/// use futures::future::lazy;
89///
90/// # fn some_computation() -> impl Future<Item = (), Error = ()> + Send {
91/// # futures::future::ok::<(), ()>(())
92/// # }
93///
94/// tokio::run(lazy(|| {
95///     let (tx, rx) = channel(100);
96///
97///     tokio::spawn({
98///         some_computation()
99///             .and_then(|value| {
100///                 tx.send(value)
101///                     .map_err(|_| ())
102///             })
103///             .map(|_| ())
104///             .map_err(|_| ())
105///     });
106///
107///     rx.for_each(|value| {
108///         println!("got value = {:?}", value);
109///         Ok(())
110///     })
111///     .map(|_| ())
112///     .map_err(|_| ())
113/// }));
114/// ```
115pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
116    assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
117    let semaphore = (::semaphore::Semaphore::new(buffer), buffer);
118    let (tx, rx) = chan::channel(semaphore);
119
120    let tx = Sender::new(tx);
121    let rx = Receiver::new(rx);
122
123    (tx, rx)
124}
125
126/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
127/// representing the channel bound.
128type Semaphore = (::semaphore::Semaphore, usize);
129
130impl<T> Receiver<T> {
131    pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
132        Receiver { chan }
133    }
134
135    /// Closes the receiving half of a channel, without dropping it.
136    ///
137    /// This prevents any further messages from being sent on the channel while
138    /// still enabling the receiver to drain messages that are buffered.
139    pub fn close(&mut self) {
140        self.chan.close();
141    }
142}
143
144impl<T> Stream for Receiver<T> {
145    type Item = T;
146    type Error = RecvError;
147
148    fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
149        self.chan.recv().map_err(|_| RecvError(()))
150    }
151}
152
153impl<T> Sender<T> {
154    pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
155        Sender { chan }
156    }
157
158    /// Check if the `Sender` is ready to handle a value.
159    ///
160    /// Polls the channel to determine if there is guaranteed capacity to send
161    /// at least one item without waiting.
162    ///
163    /// When `poll_ready` returns `Ready`, the channel reserves capacity for one
164    /// message for this `Sender` instance. The capacity is held until a message
165    /// is send or the `Sender` instance is dropped. Callers should ensure a
166    /// message is sent in a timely fashion in order to not starve other
167    /// `Sender` instances.
168    ///
169    /// # Return value
170    ///
171    /// This method returns:
172    ///
173    /// - `Ok(Async::Ready(_))` if capacity is reserved for a single message.
174    /// - `Ok(Async::NotReady)` if the channel may not have capacity, in which
175    ///   case the current task is queued to be notified once
176    ///   capacity is available;
177    /// - `Err(SendError)` if the receiver has been dropped.
178    pub fn poll_ready(&mut self) -> Poll<(), SendError> {
179        self.chan.poll_ready().map_err(|_| SendError(()))
180    }
181
182    /// Attempts to send a message on this `Sender`, returning the message
183    /// if there was an error.
184    pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
185        self.chan.try_send(message)?;
186        Ok(())
187    }
188}
189
190impl<T> Sink for Sender<T> {
191    type SinkItem = T;
192    type SinkError = SendError;
193
194    fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
195        use futures::Async::*;
196        use futures::AsyncSink;
197
198        match self.poll_ready()? {
199            Ready(_) => {
200                self.try_send(msg).map_err(|_| SendError(()))?;
201                Ok(AsyncSink::Ready)
202            }
203            NotReady => Ok(AsyncSink::NotReady(msg)),
204        }
205    }
206
207    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
208        use futures::Async::Ready;
209        Ok(Ready(()))
210    }
211
212    fn close(&mut self) -> Poll<(), Self::SinkError> {
213        use futures::Async::Ready;
214        Ok(Ready(()))
215    }
216}
217
218// ===== impl SendError =====
219
220impl fmt::Display for SendError {
221    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
222        use std::error::Error;
223        write!(fmt, "{}", self.description())
224    }
225}
226
227impl ::std::error::Error for SendError {
228    fn description(&self) -> &str {
229        "channel closed"
230    }
231}
232
233// ===== impl TrySendError =====
234
235impl<T> TrySendError<T> {
236    /// Get the inner value.
237    pub fn into_inner(self) -> T {
238        self.value
239    }
240
241    /// Did the send fail because the channel has been closed?
242    pub fn is_closed(&self) -> bool {
243        if let ErrorKind::Closed = self.kind {
244            true
245        } else {
246            false
247        }
248    }
249
250    /// Did the send fail because the channel was at capacity?
251    pub fn is_full(&self) -> bool {
252        if let ErrorKind::NoCapacity = self.kind {
253            true
254        } else {
255            false
256        }
257    }
258}
259
260impl<T: fmt::Debug> fmt::Display for TrySendError<T> {
261    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
262        use std::error::Error;
263        write!(fmt, "{}", self.description())
264    }
265}
266
267impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {
268    fn description(&self) -> &str {
269        match self.kind {
270            ErrorKind::Closed => "channel closed",
271            ErrorKind::NoCapacity => "no available capacity",
272        }
273    }
274}
275
276impl<T> From<(T, chan::TrySendError)> for TrySendError<T> {
277    fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> {
278        TrySendError {
279            value,
280            kind: match err {
281                chan::TrySendError::Closed => ErrorKind::Closed,
282                chan::TrySendError::NoPermits => ErrorKind::NoCapacity,
283            },
284        }
285    }
286}
287
288// ===== impl RecvError =====
289
290impl fmt::Display for RecvError {
291    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
292        use std::error::Error;
293        write!(fmt, "{}", self.description())
294    }
295}
296
297impl ::std::error::Error for RecvError {
298    fn description(&self) -> &str {
299        "channel closed"
300    }
301}