tokio_uds/
stream.rs

1use ucred::{self, UCred};
2
3use tokio_io::{AsyncRead, AsyncWrite};
4use tokio_reactor::{Handle, PollEvented};
5
6use bytes::{Buf, BufMut};
7use futures::{Async, Future, Poll};
8use iovec::{self, IoVec};
9use libc;
10use mio::Ready;
11use mio_uds;
12
13use std::fmt;
14use std::io::{self, Read, Write};
15use std::net::Shutdown;
16use std::os::unix::io::{AsRawFd, RawFd};
17use std::os::unix::net::{self, SocketAddr};
18use std::path::Path;
19
20/// A structure representing a connected Unix socket.
21///
22/// This socket can be connected directly with `UnixStream::connect` or accepted
23/// from a listener with `UnixListener::incoming`. Additionally, a pair of
24/// anonymous Unix sockets can be created with `UnixStream::pair`.
25pub struct UnixStream {
26    io: PollEvented<mio_uds::UnixStream>,
27}
28
29/// Future returned by `UnixStream::connect` which will resolve to a
30/// `UnixStream` when the stream is connected.
31#[derive(Debug)]
32pub struct ConnectFuture {
33    inner: State,
34}
35
36#[derive(Debug)]
37enum State {
38    Waiting(UnixStream),
39    Error(io::Error),
40    Empty,
41}
42
43impl UnixStream {
44    /// Connects to the socket named by `path`.
45    ///
46    /// This function will create a new Unix socket and connect to the path
47    /// specified, associating the returned stream with the default event loop's
48    /// handle.
49    pub fn connect<P>(path: P) -> ConnectFuture
50    where
51        P: AsRef<Path>,
52    {
53        let res = mio_uds::UnixStream::connect(path).map(UnixStream::new);
54
55        let inner = match res {
56            Ok(stream) => State::Waiting(stream),
57            Err(e) => State::Error(e),
58        };
59
60        ConnectFuture { inner }
61    }
62
63    /// Consumes a `UnixStream` in the standard library and returns a
64    /// nonblocking `UnixStream` from this crate.
65    ///
66    /// The returned stream will be associated with the given event loop
67    /// specified by `handle` and is ready to perform I/O.
68    pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> {
69        let stream = mio_uds::UnixStream::from_stream(stream)?;
70        let io = PollEvented::new_with_handle(stream, handle)?;
71
72        Ok(UnixStream { io })
73    }
74
75    /// Creates an unnamed pair of connected sockets.
76    ///
77    /// This function will create a pair of interconnected Unix sockets for
78    /// communicating back and forth between one another. Each socket will
79    /// be associated with the default event loop's handle.
80    pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
81        let (a, b) = mio_uds::UnixStream::pair()?;
82        let a = UnixStream::new(a);
83        let b = UnixStream::new(b);
84
85        Ok((a, b))
86    }
87
88    pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream {
89        let io = PollEvented::new(stream);
90        UnixStream { io }
91    }
92
93    /// Test whether this socket is ready to be read or not.
94    pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
95        self.io.poll_read_ready(ready)
96    }
97
98    /// Test whether this socket is ready to be written to or not.
99    pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
100        self.io.poll_write_ready()
101    }
102
103    /// Returns the socket address of the local half of this connection.
104    pub fn local_addr(&self) -> io::Result<SocketAddr> {
105        self.io.get_ref().local_addr()
106    }
107
108    /// Returns the socket address of the remote half of this connection.
109    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
110        self.io.get_ref().peer_addr()
111    }
112
113    /// Returns effective credentials of the process which called `connect` or `pair`.
114    pub fn peer_cred(&self) -> io::Result<UCred> {
115        ucred::get_peer_cred(self)
116    }
117
118    /// Returns the value of the `SO_ERROR` option.
119    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
120        self.io.get_ref().take_error()
121    }
122
123    /// Shuts down the read, write, or both halves of this connection.
124    ///
125    /// This function will cause all pending and future I/O calls on the
126    /// specified portions to immediately return with an appropriate value
127    /// (see the documentation of `Shutdown`).
128    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
129        self.io.get_ref().shutdown(how)
130    }
131}
132
133impl Read for UnixStream {
134    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
135        self.io.read(buf)
136    }
137}
138
139impl Write for UnixStream {
140    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
141        self.io.write(buf)
142    }
143    fn flush(&mut self) -> io::Result<()> {
144        self.io.flush()
145    }
146}
147
148impl AsyncRead for UnixStream {
149    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
150        false
151    }
152
153    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
154        <&UnixStream>::read_buf(&mut &*self, buf)
155    }
156}
157
158impl AsyncWrite for UnixStream {
159    fn shutdown(&mut self) -> Poll<(), io::Error> {
160        <&UnixStream>::shutdown(&mut &*self)
161    }
162
163    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
164        <&UnixStream>::write_buf(&mut &*self, buf)
165    }
166}
167
168impl<'a> Read for &'a UnixStream {
169    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
170        (&self.io).read(buf)
171    }
172}
173
174impl<'a> Write for &'a UnixStream {
175    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
176        (&self.io).write(buf)
177    }
178
179    fn flush(&mut self) -> io::Result<()> {
180        (&self.io).flush()
181    }
182}
183
184impl<'a> AsyncRead for &'a UnixStream {
185    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
186        false
187    }
188
189    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
190        if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? {
191            return Ok(Async::NotReady);
192        }
193        unsafe {
194            let r = read_ready(buf, self.as_raw_fd());
195            if r == -1 {
196                let e = io::Error::last_os_error();
197                if e.kind() == io::ErrorKind::WouldBlock {
198                    self.io.clear_read_ready(Ready::readable())?;
199                    Ok(Async::NotReady)
200                } else {
201                    Err(e)
202                }
203            } else {
204                let r = r as usize;
205                buf.advance_mut(r);
206                Ok(r.into())
207            }
208        }
209    }
210}
211
212impl<'a> AsyncWrite for &'a UnixStream {
213    fn shutdown(&mut self) -> Poll<(), io::Error> {
214        Ok(().into())
215    }
216
217    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
218        if let Async::NotReady = <UnixStream>::poll_write_ready(self)? {
219            return Ok(Async::NotReady);
220        }
221        unsafe {
222            let r = write_ready(buf, self.as_raw_fd());
223            if r == -1 {
224                let e = io::Error::last_os_error();
225                if e.kind() == io::ErrorKind::WouldBlock {
226                    self.io.clear_write_ready()?;
227                    Ok(Async::NotReady)
228                } else {
229                    Err(e)
230                }
231            } else {
232                let r = r as usize;
233                buf.advance(r);
234                Ok(r.into())
235            }
236        }
237    }
238}
239
240impl fmt::Debug for UnixStream {
241    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
242        self.io.get_ref().fmt(f)
243    }
244}
245
246impl AsRawFd for UnixStream {
247    fn as_raw_fd(&self) -> RawFd {
248        self.io.get_ref().as_raw_fd()
249    }
250}
251
252impl Future for ConnectFuture {
253    type Item = UnixStream;
254    type Error = io::Error;
255
256    fn poll(&mut self) -> Poll<UnixStream, io::Error> {
257        use std::mem;
258
259        match self.inner {
260            State::Waiting(ref mut stream) => {
261                if let Async::NotReady = stream.io.poll_write_ready()? {
262                    return Ok(Async::NotReady);
263                }
264
265                if let Some(e) = stream.io.get_ref().take_error()? {
266                    return Err(e);
267                }
268            }
269            State::Error(_) => {
270                let e = match mem::replace(&mut self.inner, State::Empty) {
271                    State::Error(e) => e,
272                    _ => unreachable!(),
273                };
274
275                return Err(e);
276            }
277            State::Empty => panic!("can't poll stream twice"),
278        }
279
280        match mem::replace(&mut self.inner, State::Empty) {
281            State::Waiting(stream) => Ok(Async::Ready(stream)),
282            _ => unreachable!(),
283        }
284    }
285}
286
287unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize {
288    // The `IoVec` type can't have a 0-length size, so we create a bunch
289    // of dummy versions on the stack with 1 length which we'll quickly
290    // overwrite.
291    let b1: &mut [u8] = &mut [0];
292    let b2: &mut [u8] = &mut [0];
293    let b3: &mut [u8] = &mut [0];
294    let b4: &mut [u8] = &mut [0];
295    let b5: &mut [u8] = &mut [0];
296    let b6: &mut [u8] = &mut [0];
297    let b7: &mut [u8] = &mut [0];
298    let b8: &mut [u8] = &mut [0];
299    let b9: &mut [u8] = &mut [0];
300    let b10: &mut [u8] = &mut [0];
301    let b11: &mut [u8] = &mut [0];
302    let b12: &mut [u8] = &mut [0];
303    let b13: &mut [u8] = &mut [0];
304    let b14: &mut [u8] = &mut [0];
305    let b15: &mut [u8] = &mut [0];
306    let b16: &mut [u8] = &mut [0];
307    let mut bufs: [&mut IoVec; 16] = [
308        b1.into(),
309        b2.into(),
310        b3.into(),
311        b4.into(),
312        b5.into(),
313        b6.into(),
314        b7.into(),
315        b8.into(),
316        b9.into(),
317        b10.into(),
318        b11.into(),
319        b12.into(),
320        b13.into(),
321        b14.into(),
322        b15.into(),
323        b16.into(),
324    ];
325
326    let n = buf.bytes_vec_mut(&mut bufs);
327    read_ready_vecs(&mut bufs[..n], raw_fd)
328}
329
330unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize {
331    let iovecs = iovec::unix::as_os_slice_mut(bufs);
332
333    libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
334}
335
336unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize {
337    // The `IoVec` type can't have a zero-length size, so create a dummy
338    // version from a 1-length slice which we'll overwrite with the
339    // `bytes_vec` method.
340    static DUMMY: &[u8] = &[0];
341    let iovec = <&IoVec>::from(DUMMY);
342    let mut bufs = [
343        iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec,
344        iovec, iovec, iovec,
345    ];
346
347    let n = buf.bytes_vec(&mut bufs);
348    write_ready_vecs(&bufs[..n], raw_fd)
349}
350
351unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize {
352    let iovecs = iovec::unix::as_os_slice(bufs);
353
354    libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
355}