futures_sink/
channel_impls.rs1use {Async, Sink, Poll};
2use futures_core::task;
3use futures_channel::mpsc::{Sender, SendError, UnboundedSender};
4
5impl<T> Sink for Sender<T> {
6 type SinkItem = T;
7 type SinkError = SendError;
8
9 fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
10 self.poll_ready(cx)
11 }
12
13 fn start_send(&mut self, msg: T) -> Result<(), Self::SinkError> {
14 self.start_send(msg)
15 }
16
17 fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
18 match self.poll_ready(cx) {
19 Err(ref e) if e.is_disconnected() => {
20 Ok(Async::Ready(()))
22 }
23 x => x,
24 }
25 }
26
27 fn poll_close(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
28 self.close_channel();
29 Ok(Async::Ready(()))
30 }
31}
32
33impl<T> Sink for UnboundedSender<T> {
34 type SinkItem = T;
35 type SinkError = SendError;
36
37 fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
38 UnboundedSender::poll_ready(&*self, cx)
39 }
40
41 fn start_send(&mut self, msg: T) -> Result<(), Self::SinkError> {
42 self.start_send(msg)
43 }
44
45 fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
46 Ok(Async::Ready(()))
47 }
48
49 fn poll_close(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
50 self.close_channel();
51 Ok(Async::Ready(()))
52 }
53}
54
55impl<'a, T> Sink for &'a UnboundedSender<T> {
56 type SinkItem = T;
57 type SinkError = SendError;
58
59 fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
60 UnboundedSender::poll_ready(*self, cx)
61 }
62
63 fn start_send(&mut self, msg: T) -> Result<(), Self::SinkError> {
64 self.unbounded_send(msg)
65 .map_err(|err| err.into_send_error())
66 }
67
68 fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
69 Ok(Async::Ready(()))
70 }
71
72 fn poll_close(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
73 self.close_channel();
74 Ok(Async::Ready(()))
75 }
76}