tokio_core/net/udp/frame.rs
1use std::io;
2use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4};
3
4use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
5
6use net::UdpSocket;
7
8/// Encoding of frames via buffers.
9///
10/// This trait is used when constructing an instance of `UdpFramed` and provides
11/// the `In` and `Out` types which are decoded and encoded from the socket,
12/// respectively.
13///
14/// Because UDP is a connectionless protocol, the `decode` method receives the
15/// address where data came from and the `encode` method is also responsible for
16/// determining the remote host to which the datagram should be sent
17///
18/// The trait itself is implemented on a type that can track state for decoding
19/// or encoding, which is particularly useful for streaming parsers. In many
20/// cases, though, this type will simply be a unit struct (e.g. `struct
21/// HttpCodec`).
22pub trait UdpCodec {
23 /// The type of decoded frames.
24 type In;
25
26 /// The type of frames to be encoded.
27 type Out;
28
29 /// Attempts to decode a frame from the provided buffer of bytes.
30 ///
31 /// This method is called by `UdpFramed` on a single datagram which has been
32 /// read from a socket. The `buf` argument contains the data that was
33 /// received from the remote address, and `src` is the address the data came
34 /// from. Note that typically this method should require the entire contents
35 /// of `buf` to be valid or otherwise return an error with trailing data.
36 ///
37 /// Finally, if the bytes in the buffer are malformed then an error is
38 /// returned indicating why. This informs `Framed` that the stream is now
39 /// corrupt and should be terminated.
40 fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In>;
41
42 /// Encodes a frame into the buffer provided.
43 ///
44 /// This method will encode `msg` into the byte buffer provided by `buf`.
45 /// The `buf` provided is an internal buffer of the `Framed` instance and
46 /// will be written out when possible.
47 ///
48 /// The encode method also determines the destination to which the buffer
49 /// should be directed, which will be returned as a `SocketAddr`.
50 fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr;
51}
52
53/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using
54/// the `UdpCodec` trait to encode and decode frames.
55///
56/// You can acquire a `UdpFramed` instance by using the `UdpSocket::framed`
57/// adapter.
58#[must_use = "sinks do nothing unless polled"]
59pub struct UdpFramed<C> {
60 socket: UdpSocket,
61 codec: C,
62 rd: Vec<u8>,
63 wr: Vec<u8>,
64 out_addr: SocketAddr,
65 flushed: bool,
66}
67
68impl<C: UdpCodec> Stream for UdpFramed<C> {
69 type Item = C::In;
70 type Error = io::Error;
71
72 fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
73 let (n, addr) = try_nb!(self.socket.recv_from(&mut self.rd));
74 trace!("received {} bytes, decoding", n);
75 let frame = try!(self.codec.decode(&addr, &self.rd[..n]));
76 trace!("frame decoded from buffer");
77 Ok(Async::Ready(Some(frame)))
78 }
79}
80
81impl<C: UdpCodec> Sink for UdpFramed<C> {
82 type SinkItem = C::Out;
83 type SinkError = io::Error;
84
85 fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
86 trace!("sending frame");
87
88 if !self.flushed {
89 match try!(self.poll_complete()) {
90 Async::Ready(()) => {},
91 Async::NotReady => return Ok(AsyncSink::NotReady(item)),
92 }
93 }
94
95 self.out_addr = self.codec.encode(item, &mut self.wr);
96 self.flushed = false;
97 trace!("frame encoded; length={}", self.wr.len());
98
99 Ok(AsyncSink::Ready)
100 }
101
102 fn poll_complete(&mut self) -> Poll<(), io::Error> {
103 if self.flushed {
104 return Ok(Async::Ready(()))
105 }
106
107 trace!("flushing frame; length={}", self.wr.len());
108 let n = try_nb!(self.socket.send_to(&self.wr, &self.out_addr));
109 trace!("written {}", n);
110
111 let wrote_all = n == self.wr.len();
112 self.wr.clear();
113 self.flushed = true;
114
115 if wrote_all {
116 Ok(Async::Ready(()))
117 } else {
118 Err(io::Error::new(io::ErrorKind::Other,
119 "failed to write entire datagram to socket"))
120 }
121 }
122
123 fn close(&mut self) -> Poll<(), io::Error> {
124 try_ready!(self.poll_complete());
125 Ok(().into())
126 }
127}
128
129pub fn new<C: UdpCodec>(socket: UdpSocket, codec: C) -> UdpFramed<C> {
130 UdpFramed {
131 socket: socket,
132 codec: codec,
133 out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
134 rd: vec![0; 64 * 1024],
135 wr: Vec::with_capacity(8 * 1024),
136 flushed: true,
137 }
138}
139
140impl<C> UdpFramed<C> {
141 /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
142 ///
143 /// Note that care should be taken to not tamper with the underlying stream
144 /// of data coming in as it may corrupt the stream of frames otherwise being
145 /// worked with.
146 pub fn get_ref(&self) -> &UdpSocket {
147 &self.socket
148 }
149
150 /// Returns a mutable reference to the underlying I/O stream wrapped by
151 /// `Framed`.
152 ///
153 /// Note that care should be taken to not tamper with the underlying stream
154 /// of data coming in as it may corrupt the stream of frames otherwise being
155 /// worked with.
156 pub fn get_mut(&mut self) -> &mut UdpSocket {
157 &mut self.socket
158 }
159
160 /// Consumes the `Framed`, returning its underlying I/O stream.
161 ///
162 /// Note that care should be taken to not tamper with the underlying stream
163 /// of data coming in as it may corrupt the stream of frames otherwise being
164 /// worked with.
165 pub fn into_inner(self) -> UdpSocket {
166 self.socket
167 }
168}