tokio_udp/
frame.rs

1use std::io;
2use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
3
4use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
5
6use super::UdpSocket;
7
8use bytes::{BufMut, BytesMut};
9use tokio_codec::{Decoder, Encoder};
10
11/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using
12/// the `Encoder` and `Decoder` traits to encode and decode frames.
13///
14/// Raw UDP sockets work with datagrams, but higher-level code usually wants to
15/// batch these into meaningful chunks, called "frames". This method layers
16/// framing on top of this socket by using the `Encoder` and `Decoder` traits to
17/// handle encoding and decoding of messages frames. Note that the incoming and
18/// outgoing frame types may be distinct.
19///
20/// This function returns a *single* object that is both `Stream` and `Sink`;
21/// grouping this into a single object is often useful for layering things which
22/// require both read and write access to the underlying object.
23///
24/// If you want to work more directly with the streams and sink, consider
25/// calling `split` on the `UdpFramed` returned by this method, which will break
26/// them into separate objects, allowing them to interact more easily.
27#[must_use = "sinks do nothing unless polled"]
28#[derive(Debug)]
29pub struct UdpFramed<C> {
30    socket: UdpSocket,
31    codec: C,
32    rd: BytesMut,
33    wr: BytesMut,
34    out_addr: SocketAddr,
35    flushed: bool,
36    is_readable: bool,
37    repeat_decode: bool,
38    current_addr: Option<SocketAddr>,
39}
40
41impl<C: Decoder> Stream for UdpFramed<C> {
42    type Item = (C::Item, SocketAddr);
43    type Error = C::Error;
44
45    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
46        self.rd.reserve(INITIAL_RD_CAPACITY);
47
48        if self.repeat_decode {
49            loop {
50                // Are there are still bytes left in the read buffer to decode?
51                if self.is_readable {
52                    // Use deocde_eof since every datagram contains its own
53                    // eof which is just the end of the datagram. This supports
54                    // the lines use case where there may not be a terminating
55                    // delimiter and thus you may never get the end of the frame.
56                    // This is generally fine for most implementations of codec
57                    // since by default this will defer to calling decode.
58                    if let Some(frame) = self.codec.decode_eof(&mut self.rd)? {
59                        trace!("frame decoded from buffer");
60
61                        let current_addr = self
62                            .current_addr
63                            .expect("will always be set before this line is called");
64
65                        return Ok(Async::Ready(Some((frame, current_addr))));
66                    }
67
68                    // if this line has been reached then decode has returned `None`.
69                    self.is_readable = false;
70                    self.rd.clear();
71                }
72
73                // We're out of data. Try and fetch more data to decode
74                let (n, addr) = unsafe {
75                    // Read into the buffer without having to initialize the memory.
76                    let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
77                    self.rd.advance_mut(n);
78                    (n, addr)
79                };
80
81                self.current_addr = Some(addr);
82                self.is_readable = true;
83
84                trace!("received {} bytes, decoding", n);
85            }
86        } else {
87            let (n, addr) = unsafe {
88                // Read into the buffer without having to initialize the memory.
89                let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
90                self.rd.advance_mut(n);
91                (n, addr)
92            };
93            trace!("received {} bytes, decoding", n);
94            let frame_res = self.codec.decode(&mut self.rd);
95            self.rd.clear();
96            let frame = frame_res?;
97            let result = frame.map(|frame| (frame, addr)); // frame -> (frame, addr)
98            trace!("frame decoded from buffer");
99            Ok(Async::Ready(result))
100        }
101    }
102}
103
104impl<C: Encoder> Sink for UdpFramed<C> {
105    type SinkItem = (C::Item, SocketAddr);
106    type SinkError = C::Error;
107
108    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
109        trace!("sending frame");
110
111        if !self.flushed {
112            match self.poll_complete()? {
113                Async::Ready(()) => {}
114                Async::NotReady => return Ok(AsyncSink::NotReady(item)),
115            }
116        }
117
118        let (frame, out_addr) = item;
119        self.codec.encode(frame, &mut self.wr)?;
120        self.out_addr = out_addr;
121        self.flushed = false;
122        trace!("frame encoded; length={}", self.wr.len());
123
124        Ok(AsyncSink::Ready)
125    }
126
127    fn poll_complete(&mut self) -> Poll<(), C::Error> {
128        if self.flushed {
129            return Ok(Async::Ready(()));
130        }
131
132        trace!("flushing frame; length={}", self.wr.len());
133        let n = try_ready!(self.socket.poll_send_to(&self.wr, &self.out_addr));
134        trace!("written {}", n);
135
136        let wrote_all = n == self.wr.len();
137        self.wr.clear();
138        self.flushed = true;
139
140        if wrote_all {
141            Ok(Async::Ready(()))
142        } else {
143            Err(io::Error::new(
144                io::ErrorKind::Other,
145                "failed to write entire datagram to socket",
146            )
147            .into())
148        }
149    }
150
151    fn close(&mut self) -> Poll<(), C::Error> {
152        try_ready!(self.poll_complete());
153        Ok(().into())
154    }
155}
156
157const INITIAL_RD_CAPACITY: usize = 64 * 1024;
158const INITIAL_WR_CAPACITY: usize = 8 * 1024;
159
160impl<C> UdpFramed<C> {
161    /// Create a new `UdpFramed` backed by the given socket and codec.
162    ///
163    /// See struct level documentation for more details.
164    pub fn new(socket: UdpSocket, codec: C) -> UdpFramed<C> {
165        UdpFramed {
166            socket: socket,
167            codec: codec,
168            out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
169            rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
170            wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
171            flushed: true,
172            is_readable: false,
173            repeat_decode: false,
174            current_addr: None,
175        }
176    }
177
178    /// Create a new `UdpFramed` backed by the given socket and codec. That will
179    /// continue to call `decode_eof` until the decoder has cleared the entire buffer.
180    ///
181    /// See struct level documentation for more details.
182    pub fn with_decode(socket: UdpSocket, codec: C, repeat_decode: bool) -> UdpFramed<C> {
183        UdpFramed {
184            socket: socket,
185            codec: codec,
186            out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
187            rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
188            wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
189            flushed: true,
190            is_readable: false,
191            repeat_decode,
192            current_addr: None,
193        }
194    }
195
196    /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
197    ///
198    /// # Note
199    ///
200    /// Care should be taken to not tamper with the underlying stream of data
201    /// coming in as it may corrupt the stream of frames otherwise being worked
202    /// with.
203    pub fn get_ref(&self) -> &UdpSocket {
204        &self.socket
205    }
206
207    /// Returns a mutable reference to the underlying I/O stream wrapped by
208    /// `Framed`.
209    ///
210    /// # Note
211    ///
212    /// Care should be taken to not tamper with the underlying stream of data
213    /// coming in as it may corrupt the stream of frames otherwise being worked
214    /// with.
215    pub fn get_mut(&mut self) -> &mut UdpSocket {
216        &mut self.socket
217    }
218
219    /// Consumes the `Framed`, returning its underlying I/O stream.
220    pub fn into_inner(self) -> UdpSocket {
221        self.socket
222    }
223}