1use std::io;
2use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
3
4use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
5
6use super::UdpSocket;
7
8use bytes::{BufMut, BytesMut};
9use tokio_codec::{Decoder, Encoder};
10
11#[must_use = "sinks do nothing unless polled"]
28#[derive(Debug)]
29pub struct UdpFramed<C> {
30 socket: UdpSocket,
31 codec: C,
32 rd: BytesMut,
33 wr: BytesMut,
34 out_addr: SocketAddr,
35 flushed: bool,
36 is_readable: bool,
37 repeat_decode: bool,
38 current_addr: Option<SocketAddr>,
39}
40
41impl<C: Decoder> Stream for UdpFramed<C> {
42 type Item = (C::Item, SocketAddr);
43 type Error = C::Error;
44
45 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
46 self.rd.reserve(INITIAL_RD_CAPACITY);
47
48 if self.repeat_decode {
49 loop {
50 if self.is_readable {
52 if let Some(frame) = self.codec.decode_eof(&mut self.rd)? {
59 trace!("frame decoded from buffer");
60
61 let current_addr = self
62 .current_addr
63 .expect("will always be set before this line is called");
64
65 return Ok(Async::Ready(Some((frame, current_addr))));
66 }
67
68 self.is_readable = false;
70 self.rd.clear();
71 }
72
73 let (n, addr) = unsafe {
75 let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
77 self.rd.advance_mut(n);
78 (n, addr)
79 };
80
81 self.current_addr = Some(addr);
82 self.is_readable = true;
83
84 trace!("received {} bytes, decoding", n);
85 }
86 } else {
87 let (n, addr) = unsafe {
88 let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
90 self.rd.advance_mut(n);
91 (n, addr)
92 };
93 trace!("received {} bytes, decoding", n);
94 let frame_res = self.codec.decode(&mut self.rd);
95 self.rd.clear();
96 let frame = frame_res?;
97 let result = frame.map(|frame| (frame, addr)); trace!("frame decoded from buffer");
99 Ok(Async::Ready(result))
100 }
101 }
102}
103
104impl<C: Encoder> Sink for UdpFramed<C> {
105 type SinkItem = (C::Item, SocketAddr);
106 type SinkError = C::Error;
107
108 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
109 trace!("sending frame");
110
111 if !self.flushed {
112 match self.poll_complete()? {
113 Async::Ready(()) => {}
114 Async::NotReady => return Ok(AsyncSink::NotReady(item)),
115 }
116 }
117
118 let (frame, out_addr) = item;
119 self.codec.encode(frame, &mut self.wr)?;
120 self.out_addr = out_addr;
121 self.flushed = false;
122 trace!("frame encoded; length={}", self.wr.len());
123
124 Ok(AsyncSink::Ready)
125 }
126
127 fn poll_complete(&mut self) -> Poll<(), C::Error> {
128 if self.flushed {
129 return Ok(Async::Ready(()));
130 }
131
132 trace!("flushing frame; length={}", self.wr.len());
133 let n = try_ready!(self.socket.poll_send_to(&self.wr, &self.out_addr));
134 trace!("written {}", n);
135
136 let wrote_all = n == self.wr.len();
137 self.wr.clear();
138 self.flushed = true;
139
140 if wrote_all {
141 Ok(Async::Ready(()))
142 } else {
143 Err(io::Error::new(
144 io::ErrorKind::Other,
145 "failed to write entire datagram to socket",
146 )
147 .into())
148 }
149 }
150
151 fn close(&mut self) -> Poll<(), C::Error> {
152 try_ready!(self.poll_complete());
153 Ok(().into())
154 }
155}
156
157const INITIAL_RD_CAPACITY: usize = 64 * 1024;
158const INITIAL_WR_CAPACITY: usize = 8 * 1024;
159
160impl<C> UdpFramed<C> {
161 pub fn new(socket: UdpSocket, codec: C) -> UdpFramed<C> {
165 UdpFramed {
166 socket: socket,
167 codec: codec,
168 out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
169 rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
170 wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
171 flushed: true,
172 is_readable: false,
173 repeat_decode: false,
174 current_addr: None,
175 }
176 }
177
178 pub fn with_decode(socket: UdpSocket, codec: C, repeat_decode: bool) -> UdpFramed<C> {
183 UdpFramed {
184 socket: socket,
185 codec: codec,
186 out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
187 rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
188 wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
189 flushed: true,
190 is_readable: false,
191 repeat_decode,
192 current_addr: None,
193 }
194 }
195
196 pub fn get_ref(&self) -> &UdpSocket {
204 &self.socket
205 }
206
207 pub fn get_mut(&mut self) -> &mut UdpSocket {
216 &mut self.socket
217 }
218
219 pub fn into_inner(self) -> UdpSocket {
221 self.socket
222 }
223}