tokio_uds/
frame.rs

1use std::io;
2use std::os::unix::net::SocketAddr;
3use std::path::Path;
4
5use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
6
7use super::UnixDatagram;
8
9use bytes::{BufMut, BytesMut};
10use tokio_codec::{Decoder, Encoder};
11
12/// A unified `Stream` and `Sink` interface to an underlying `UnixDatagram`, using
13/// the `Encoder` and `Decoder` traits to encode and decode frames.
14///
15/// Unix datagram sockets work with datagrams, but higher-level code may wants to
16/// batch these into meaningful chunks, called "frames". This method layers
17/// framing on top of this socket by using the `Encoder` and `Decoder` traits to
18/// handle encoding and decoding of messages frames. Note that the incoming and
19/// outgoing frame types may be distinct.
20///
21/// This function returns a *single* object that is both `Stream` and `Sink`;
22/// grouping this into a single object is often useful for layering things which
23/// require both read and write access to the underlying object.
24///
25/// If you want to work more directly with the streams and sink, consider
26/// calling `split` on the `UnixDatagramFramed` returned by this method, which will break
27/// them into separate objects, allowing them to interact more easily.
28#[must_use = "sinks do nothing unless polled"]
29#[derive(Debug)]
30pub struct UnixDatagramFramed<A, C> {
31    socket: UnixDatagram,
32    codec: C,
33    rd: BytesMut,
34    wr: BytesMut,
35    out_addr: Option<A>,
36    flushed: bool,
37}
38
39impl<A, C: Decoder> Stream for UnixDatagramFramed<A, C> {
40    type Item = (C::Item, SocketAddr);
41    type Error = C::Error;
42
43    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
44        self.rd.reserve(INITIAL_RD_CAPACITY);
45
46        let (n, addr) = unsafe {
47            let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
48            self.rd.advance_mut(n);
49            (n, addr)
50        };
51        trace!("received {} bytes, decoding", n);
52        let frame_res = self.codec.decode(&mut self.rd);
53        self.rd.clear();
54        let frame = frame_res?;
55        let result = frame.map(|frame| (frame, addr));
56        trace!("frame decoded from buffer");
57        Ok(Async::Ready(result))
58    }
59}
60
61impl<A: AsRef<Path>, C: Encoder> Sink for UnixDatagramFramed<A, C> {
62    type SinkItem = (C::Item, A);
63    type SinkError = C::Error;
64
65    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
66        trace!("sending frame");
67
68        if !self.flushed {
69            match self.poll_complete()? {
70                Async::Ready(()) => {}
71                Async::NotReady => return Ok(AsyncSink::NotReady(item)),
72            }
73        }
74
75        let (frame, out_addr) = item;
76        self.codec.encode(frame, &mut self.wr)?;
77        self.out_addr = Some(out_addr);
78        self.flushed = false;
79        trace!("frame encoded; length={}", self.wr.len());
80
81        Ok(AsyncSink::Ready)
82    }
83
84    fn poll_complete(&mut self) -> Poll<(), C::Error> {
85        if self.flushed {
86            return Ok(Async::Ready(()));
87        }
88
89        let n = {
90            let out_path = match self.out_addr {
91                Some(ref out_path) => out_path.as_ref(),
92                None => {
93                    return Err(io::Error::new(
94                        io::ErrorKind::Other,
95                        "internal error: addr not available while data not flushed",
96                    )
97                    .into());
98                }
99            };
100
101            trace!("flushing frame; length={}", self.wr.len());
102            try_ready!(self.socket.poll_send_to(&self.wr, out_path))
103        };
104
105        trace!("written {}", n);
106
107        let wrote_all = n == self.wr.len();
108        self.wr.clear();
109        self.flushed = true;
110
111        if wrote_all {
112            self.out_addr = None;
113            Ok(Async::Ready(()))
114        } else {
115            Err(io::Error::new(
116                io::ErrorKind::Other,
117                "failed to write entire datagram to socket",
118            )
119            .into())
120        }
121    }
122
123    fn close(&mut self) -> Poll<(), C::Error> {
124        self.poll_complete()
125    }
126}
127
128const INITIAL_RD_CAPACITY: usize = 64 * 1024;
129const INITIAL_WR_CAPACITY: usize = 8 * 1024;
130
131impl<A, C> UnixDatagramFramed<A, C> {
132    /// Create a new `UnixDatagramFramed` backed by the given socket and codec.
133    ///
134    /// See struct level documentation for more details.
135    pub fn new(socket: UnixDatagram, codec: C) -> UnixDatagramFramed<A, C> {
136        UnixDatagramFramed {
137            socket: socket,
138            codec: codec,
139            out_addr: None,
140            rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
141            wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
142            flushed: true,
143        }
144    }
145
146    /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
147    ///
148    /// # Note
149    ///
150    /// Care should be taken to not tamper with the underlying stream of data
151    /// coming in as it may corrupt the stream of frames otherwise being worked
152    /// with.
153    pub fn get_ref(&self) -> &UnixDatagram {
154        &self.socket
155    }
156
157    /// Returns a mutable reference to the underlying I/O stream wrapped by
158    /// `Framed`.
159    ///
160    /// # Note
161    ///
162    /// Care should be taken to not tamper with the underlying stream of data
163    /// coming in as it may corrupt the stream of frames otherwise being worked
164    /// with.
165    pub fn get_mut(&mut self) -> &mut UnixDatagram {
166        &mut self.socket
167    }
168}