1use std::io;
2use std::os::unix::net::SocketAddr;
3use std::path::Path;
4
5use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
6
7use super::UnixDatagram;
8
9use bytes::{BufMut, BytesMut};
10use tokio_codec::{Decoder, Encoder};
11
12#[must_use = "sinks do nothing unless polled"]
29#[derive(Debug)]
30pub struct UnixDatagramFramed<A, C> {
31 socket: UnixDatagram,
32 codec: C,
33 rd: BytesMut,
34 wr: BytesMut,
35 out_addr: Option<A>,
36 flushed: bool,
37}
38
39impl<A, C: Decoder> Stream for UnixDatagramFramed<A, C> {
40 type Item = (C::Item, SocketAddr);
41 type Error = C::Error;
42
43 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
44 self.rd.reserve(INITIAL_RD_CAPACITY);
45
46 let (n, addr) = unsafe {
47 let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
48 self.rd.advance_mut(n);
49 (n, addr)
50 };
51 trace!("received {} bytes, decoding", n);
52 let frame_res = self.codec.decode(&mut self.rd);
53 self.rd.clear();
54 let frame = frame_res?;
55 let result = frame.map(|frame| (frame, addr));
56 trace!("frame decoded from buffer");
57 Ok(Async::Ready(result))
58 }
59}
60
61impl<A: AsRef<Path>, C: Encoder> Sink for UnixDatagramFramed<A, C> {
62 type SinkItem = (C::Item, A);
63 type SinkError = C::Error;
64
65 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
66 trace!("sending frame");
67
68 if !self.flushed {
69 match self.poll_complete()? {
70 Async::Ready(()) => {}
71 Async::NotReady => return Ok(AsyncSink::NotReady(item)),
72 }
73 }
74
75 let (frame, out_addr) = item;
76 self.codec.encode(frame, &mut self.wr)?;
77 self.out_addr = Some(out_addr);
78 self.flushed = false;
79 trace!("frame encoded; length={}", self.wr.len());
80
81 Ok(AsyncSink::Ready)
82 }
83
84 fn poll_complete(&mut self) -> Poll<(), C::Error> {
85 if self.flushed {
86 return Ok(Async::Ready(()));
87 }
88
89 let n = {
90 let out_path = match self.out_addr {
91 Some(ref out_path) => out_path.as_ref(),
92 None => {
93 return Err(io::Error::new(
94 io::ErrorKind::Other,
95 "internal error: addr not available while data not flushed",
96 )
97 .into());
98 }
99 };
100
101 trace!("flushing frame; length={}", self.wr.len());
102 try_ready!(self.socket.poll_send_to(&self.wr, out_path))
103 };
104
105 trace!("written {}", n);
106
107 let wrote_all = n == self.wr.len();
108 self.wr.clear();
109 self.flushed = true;
110
111 if wrote_all {
112 self.out_addr = None;
113 Ok(Async::Ready(()))
114 } else {
115 Err(io::Error::new(
116 io::ErrorKind::Other,
117 "failed to write entire datagram to socket",
118 )
119 .into())
120 }
121 }
122
123 fn close(&mut self) -> Poll<(), C::Error> {
124 self.poll_complete()
125 }
126}
127
128const INITIAL_RD_CAPACITY: usize = 64 * 1024;
129const INITIAL_WR_CAPACITY: usize = 8 * 1024;
130
131impl<A, C> UnixDatagramFramed<A, C> {
132 pub fn new(socket: UnixDatagram, codec: C) -> UnixDatagramFramed<A, C> {
136 UnixDatagramFramed {
137 socket: socket,
138 codec: codec,
139 out_addr: None,
140 rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
141 wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
142 flushed: true,
143 }
144 }
145
146 pub fn get_ref(&self) -> &UnixDatagram {
154 &self.socket
155 }
156
157 pub fn get_mut(&mut self) -> &mut UnixDatagram {
166 &mut self.socket
167 }
168}