tokio_sync/mpsc/
bounded.rs1use super::chan;
2
3use futures::{Poll, Sink, StartSend, Stream};
4
5use std::fmt;
6
7pub struct Sender<T> {
11 chan: chan::Tx<T, Semaphore>,
12}
13
14impl<T> Clone for Sender<T> {
15 fn clone(&self) -> Self {
16 Sender {
17 chan: self.chan.clone(),
18 }
19 }
20}
21
22impl<T> fmt::Debug for Sender<T> {
23 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
24 fmt.debug_struct("Sender")
25 .field("chan", &self.chan)
26 .finish()
27 }
28}
29
30pub struct Receiver<T> {
34 chan: chan::Rx<T, Semaphore>,
36}
37
38impl<T> fmt::Debug for Receiver<T> {
39 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
40 fmt.debug_struct("Receiver")
41 .field("chan", &self.chan)
42 .finish()
43 }
44}
45
46#[derive(Debug)]
48pub struct SendError(());
49
50#[derive(Debug)]
52pub struct TrySendError<T> {
53 kind: ErrorKind,
54 value: T,
55}
56
57#[derive(Debug)]
58enum ErrorKind {
59 Closed,
60 NoCapacity,
61}
62
63#[derive(Debug)]
65pub struct RecvError(());
66
67pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
116 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
117 let semaphore = (::semaphore::Semaphore::new(buffer), buffer);
118 let (tx, rx) = chan::channel(semaphore);
119
120 let tx = Sender::new(tx);
121 let rx = Receiver::new(rx);
122
123 (tx, rx)
124}
125
126type Semaphore = (::semaphore::Semaphore, usize);
129
130impl<T> Receiver<T> {
131 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
132 Receiver { chan }
133 }
134
135 pub fn close(&mut self) {
140 self.chan.close();
141 }
142}
143
144impl<T> Stream for Receiver<T> {
145 type Item = T;
146 type Error = RecvError;
147
148 fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
149 self.chan.recv().map_err(|_| RecvError(()))
150 }
151}
152
153impl<T> Sender<T> {
154 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
155 Sender { chan }
156 }
157
158 pub fn poll_ready(&mut self) -> Poll<(), SendError> {
179 self.chan.poll_ready().map_err(|_| SendError(()))
180 }
181
182 pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
185 self.chan.try_send(message)?;
186 Ok(())
187 }
188}
189
190impl<T> Sink for Sender<T> {
191 type SinkItem = T;
192 type SinkError = SendError;
193
194 fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
195 use futures::Async::*;
196 use futures::AsyncSink;
197
198 match self.poll_ready()? {
199 Ready(_) => {
200 self.try_send(msg).map_err(|_| SendError(()))?;
201 Ok(AsyncSink::Ready)
202 }
203 NotReady => Ok(AsyncSink::NotReady(msg)),
204 }
205 }
206
207 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
208 use futures::Async::Ready;
209 Ok(Ready(()))
210 }
211
212 fn close(&mut self) -> Poll<(), Self::SinkError> {
213 use futures::Async::Ready;
214 Ok(Ready(()))
215 }
216}
217
218impl fmt::Display for SendError {
221 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
222 use std::error::Error;
223 write!(fmt, "{}", self.description())
224 }
225}
226
227impl ::std::error::Error for SendError {
228 fn description(&self) -> &str {
229 "channel closed"
230 }
231}
232
233impl<T> TrySendError<T> {
236 pub fn into_inner(self) -> T {
238 self.value
239 }
240
241 pub fn is_closed(&self) -> bool {
243 if let ErrorKind::Closed = self.kind {
244 true
245 } else {
246 false
247 }
248 }
249
250 pub fn is_full(&self) -> bool {
252 if let ErrorKind::NoCapacity = self.kind {
253 true
254 } else {
255 false
256 }
257 }
258}
259
260impl<T: fmt::Debug> fmt::Display for TrySendError<T> {
261 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
262 use std::error::Error;
263 write!(fmt, "{}", self.description())
264 }
265}
266
267impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {
268 fn description(&self) -> &str {
269 match self.kind {
270 ErrorKind::Closed => "channel closed",
271 ErrorKind::NoCapacity => "no available capacity",
272 }
273 }
274}
275
276impl<T> From<(T, chan::TrySendError)> for TrySendError<T> {
277 fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> {
278 TrySendError {
279 value,
280 kind: match err {
281 chan::TrySendError::Closed => ErrorKind::Closed,
282 chan::TrySendError::NoPermits => ErrorKind::NoCapacity,
283 },
284 }
285 }
286}
287
288impl fmt::Display for RecvError {
291 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
292 use std::error::Error;
293 write!(fmt, "{}", self.description())
294 }
295}
296
297impl ::std::error::Error for RecvError {
298 fn description(&self) -> &str {
299 "channel closed"
300 }
301}