futures_sink/
channel_impls.rs

1use {Async, Sink, Poll};
2use futures_core::task;
3use futures_channel::mpsc::{Sender, SendError, UnboundedSender};
4
5impl<T> Sink for Sender<T> {
6    type SinkItem = T;
7    type SinkError = SendError;
8
9    fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
10        self.poll_ready(cx)
11    }
12
13    fn start_send(&mut self, msg: T) -> Result<(), Self::SinkError> {
14        self.start_send(msg)
15    }
16
17    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
18        match self.poll_ready(cx) {
19            Err(ref e) if e.is_disconnected() => {
20                // If the receiver disconnected, we consider the sink to be flushed.
21                Ok(Async::Ready(()))
22            }
23            x => x,
24        }
25    }
26
27    fn poll_close(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
28        self.close_channel();
29        Ok(Async::Ready(()))
30    }
31}
32
33impl<T> Sink for UnboundedSender<T> {
34    type SinkItem = T;
35    type SinkError = SendError;
36
37    fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
38        UnboundedSender::poll_ready(&*self, cx)
39    }
40
41    fn start_send(&mut self, msg: T) -> Result<(), Self::SinkError> {
42        self.start_send(msg)
43    }
44
45    fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
46        Ok(Async::Ready(()))
47    }
48
49    fn poll_close(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
50        self.close_channel();
51        Ok(Async::Ready(()))
52    }
53}
54
55impl<'a, T> Sink for &'a UnboundedSender<T> {
56    type SinkItem = T;
57    type SinkError = SendError;
58
59    fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
60        UnboundedSender::poll_ready(*self, cx)
61    }
62
63    fn start_send(&mut self, msg: T) -> Result<(), Self::SinkError> {
64        self.unbounded_send(msg)
65            .map_err(|err| err.into_send_error())
66    }
67
68    fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
69        Ok(Async::Ready(()))
70    }
71
72    fn poll_close(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
73        self.close_channel();
74        Ok(Async::Ready(()))
75    }
76}