tokio_core/net/udp/
frame.rs

1use std::io;
2use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4};
3
4use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
5
6use net::UdpSocket;
7
8/// Encoding of frames via buffers.
9///
10/// This trait is used when constructing an instance of `UdpFramed` and provides
11/// the `In` and `Out` types which are decoded and encoded from the socket,
12/// respectively.
13///
14/// Because UDP is a connectionless protocol, the `decode` method receives the
15/// address where data came from and the `encode` method is also responsible for
16/// determining the remote host to which the datagram should be sent
17///
18/// The trait itself is implemented on a type that can track state for decoding
19/// or encoding, which is particularly useful for streaming parsers. In many
20/// cases, though, this type will simply be a unit struct (e.g. `struct
21/// HttpCodec`).
22pub trait UdpCodec {
23    /// The type of decoded frames.
24    type In;
25
26    /// The type of frames to be encoded.
27    type Out;
28
29    /// Attempts to decode a frame from the provided buffer of bytes.
30    ///
31    /// This method is called by `UdpFramed` on a single datagram which has been
32    /// read from a socket. The `buf` argument contains the data that was
33    /// received from the remote address, and `src` is the address the data came
34    /// from. Note that typically this method should require the entire contents
35    /// of `buf` to be valid or otherwise return an error with trailing data.
36    ///
37    /// Finally, if the bytes in the buffer are malformed then an error is
38    /// returned indicating why. This informs `Framed` that the stream is now
39    /// corrupt and should be terminated.
40    fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In>;
41
42    /// Encodes a frame into the buffer provided.
43    ///
44    /// This method will encode `msg` into the byte buffer provided by `buf`.
45    /// The `buf` provided is an internal buffer of the `Framed` instance and
46    /// will be written out when possible.
47    ///
48    /// The encode method also determines the destination to which the buffer
49    /// should be directed, which will be returned as a `SocketAddr`.
50    fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr;
51}
52
53/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using
54/// the `UdpCodec` trait to encode and decode frames.
55///
56/// You can acquire a `UdpFramed` instance by using the `UdpSocket::framed`
57/// adapter.
58#[must_use = "sinks do nothing unless polled"]
59pub struct UdpFramed<C> {
60    socket: UdpSocket,
61    codec: C,
62    rd: Vec<u8>,
63    wr: Vec<u8>,
64    out_addr: SocketAddr,
65    flushed: bool,
66}
67
68impl<C: UdpCodec> Stream for UdpFramed<C> {
69    type Item = C::In;
70    type Error = io::Error;
71
72    fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
73        let (n, addr) = try_nb!(self.socket.recv_from(&mut self.rd));
74        trace!("received {} bytes, decoding", n);
75        let frame = try!(self.codec.decode(&addr, &self.rd[..n]));
76        trace!("frame decoded from buffer");
77        Ok(Async::Ready(Some(frame)))
78    }
79}
80
81impl<C: UdpCodec> Sink for UdpFramed<C> {
82    type SinkItem = C::Out;
83    type SinkError = io::Error;
84
85    fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
86        trace!("sending frame");
87
88        if !self.flushed {
89            match try!(self.poll_complete()) {
90                Async::Ready(()) => {},
91                Async::NotReady => return Ok(AsyncSink::NotReady(item)),
92            }
93        }
94
95        self.out_addr = self.codec.encode(item, &mut self.wr);
96        self.flushed = false;
97        trace!("frame encoded; length={}", self.wr.len());
98
99        Ok(AsyncSink::Ready)
100    }
101
102    fn poll_complete(&mut self) -> Poll<(), io::Error> {
103        if self.flushed {
104            return Ok(Async::Ready(()))
105        }
106
107        trace!("flushing frame; length={}", self.wr.len());
108        let n = try_nb!(self.socket.send_to(&self.wr, &self.out_addr));
109        trace!("written {}", n);
110
111        let wrote_all = n == self.wr.len();
112        self.wr.clear();
113        self.flushed = true;
114
115        if wrote_all {
116            Ok(Async::Ready(()))
117        } else {
118            Err(io::Error::new(io::ErrorKind::Other,
119                               "failed to write entire datagram to socket"))
120        }
121    }
122
123    fn close(&mut self) -> Poll<(), io::Error> {
124        try_ready!(self.poll_complete());
125        Ok(().into())
126    }
127}
128
129pub fn new<C: UdpCodec>(socket: UdpSocket, codec: C) -> UdpFramed<C> {
130    UdpFramed {
131        socket: socket,
132        codec: codec,
133        out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
134        rd: vec![0; 64 * 1024],
135        wr: Vec::with_capacity(8 * 1024),
136        flushed: true,
137    }
138}
139
140impl<C> UdpFramed<C> {
141    /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
142    ///
143    /// Note that care should be taken to not tamper with the underlying stream
144    /// of data coming in as it may corrupt the stream of frames otherwise being
145    /// worked with.
146    pub fn get_ref(&self) -> &UdpSocket {
147        &self.socket
148    }
149
150    /// Returns a mutable reference to the underlying I/O stream wrapped by
151    /// `Framed`.
152    ///
153    /// Note that care should be taken to not tamper with the underlying stream
154    /// of data coming in as it may corrupt the stream of frames otherwise being
155    /// worked with.
156    pub fn get_mut(&mut self) -> &mut UdpSocket {
157        &mut self.socket
158    }
159
160    /// Consumes the `Framed`, returning its underlying I/O stream.
161    ///
162    /// Note that care should be taken to not tamper with the underlying stream
163    /// of data coming in as it may corrupt the stream of frames otherwise being
164    /// worked with.
165    pub fn into_inner(self) -> UdpSocket {
166        self.socket
167    }
168}