1use ucred::{self, UCred};
2
3use tokio_io::{AsyncRead, AsyncWrite};
4use tokio_reactor::{Handle, PollEvented};
5
6use bytes::{Buf, BufMut};
7use futures::{Async, Future, Poll};
8use iovec::{self, IoVec};
9use libc;
10use mio::Ready;
11use mio_uds;
12
13use std::fmt;
14use std::io::{self, Read, Write};
15use std::net::Shutdown;
16use std::os::unix::io::{AsRawFd, RawFd};
17use std::os::unix::net::{self, SocketAddr};
18use std::path::Path;
19
20pub struct UnixStream {
26 io: PollEvented<mio_uds::UnixStream>,
27}
28
29#[derive(Debug)]
32pub struct ConnectFuture {
33 inner: State,
34}
35
36#[derive(Debug)]
37enum State {
38 Waiting(UnixStream),
39 Error(io::Error),
40 Empty,
41}
42
43impl UnixStream {
44 pub fn connect<P>(path: P) -> ConnectFuture
50 where
51 P: AsRef<Path>,
52 {
53 let res = mio_uds::UnixStream::connect(path).map(UnixStream::new);
54
55 let inner = match res {
56 Ok(stream) => State::Waiting(stream),
57 Err(e) => State::Error(e),
58 };
59
60 ConnectFuture { inner }
61 }
62
63 pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> {
69 let stream = mio_uds::UnixStream::from_stream(stream)?;
70 let io = PollEvented::new_with_handle(stream, handle)?;
71
72 Ok(UnixStream { io })
73 }
74
75 pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
81 let (a, b) = mio_uds::UnixStream::pair()?;
82 let a = UnixStream::new(a);
83 let b = UnixStream::new(b);
84
85 Ok((a, b))
86 }
87
88 pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream {
89 let io = PollEvented::new(stream);
90 UnixStream { io }
91 }
92
93 pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
95 self.io.poll_read_ready(ready)
96 }
97
98 pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
100 self.io.poll_write_ready()
101 }
102
103 pub fn local_addr(&self) -> io::Result<SocketAddr> {
105 self.io.get_ref().local_addr()
106 }
107
108 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
110 self.io.get_ref().peer_addr()
111 }
112
113 pub fn peer_cred(&self) -> io::Result<UCred> {
115 ucred::get_peer_cred(self)
116 }
117
118 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
120 self.io.get_ref().take_error()
121 }
122
123 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
129 self.io.get_ref().shutdown(how)
130 }
131}
132
133impl Read for UnixStream {
134 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
135 self.io.read(buf)
136 }
137}
138
139impl Write for UnixStream {
140 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
141 self.io.write(buf)
142 }
143 fn flush(&mut self) -> io::Result<()> {
144 self.io.flush()
145 }
146}
147
148impl AsyncRead for UnixStream {
149 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
150 false
151 }
152
153 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
154 <&UnixStream>::read_buf(&mut &*self, buf)
155 }
156}
157
158impl AsyncWrite for UnixStream {
159 fn shutdown(&mut self) -> Poll<(), io::Error> {
160 <&UnixStream>::shutdown(&mut &*self)
161 }
162
163 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
164 <&UnixStream>::write_buf(&mut &*self, buf)
165 }
166}
167
168impl<'a> Read for &'a UnixStream {
169 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
170 (&self.io).read(buf)
171 }
172}
173
174impl<'a> Write for &'a UnixStream {
175 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
176 (&self.io).write(buf)
177 }
178
179 fn flush(&mut self) -> io::Result<()> {
180 (&self.io).flush()
181 }
182}
183
184impl<'a> AsyncRead for &'a UnixStream {
185 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
186 false
187 }
188
189 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
190 if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? {
191 return Ok(Async::NotReady);
192 }
193 unsafe {
194 let r = read_ready(buf, self.as_raw_fd());
195 if r == -1 {
196 let e = io::Error::last_os_error();
197 if e.kind() == io::ErrorKind::WouldBlock {
198 self.io.clear_read_ready(Ready::readable())?;
199 Ok(Async::NotReady)
200 } else {
201 Err(e)
202 }
203 } else {
204 let r = r as usize;
205 buf.advance_mut(r);
206 Ok(r.into())
207 }
208 }
209 }
210}
211
212impl<'a> AsyncWrite for &'a UnixStream {
213 fn shutdown(&mut self) -> Poll<(), io::Error> {
214 Ok(().into())
215 }
216
217 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
218 if let Async::NotReady = <UnixStream>::poll_write_ready(self)? {
219 return Ok(Async::NotReady);
220 }
221 unsafe {
222 let r = write_ready(buf, self.as_raw_fd());
223 if r == -1 {
224 let e = io::Error::last_os_error();
225 if e.kind() == io::ErrorKind::WouldBlock {
226 self.io.clear_write_ready()?;
227 Ok(Async::NotReady)
228 } else {
229 Err(e)
230 }
231 } else {
232 let r = r as usize;
233 buf.advance(r);
234 Ok(r.into())
235 }
236 }
237 }
238}
239
240impl fmt::Debug for UnixStream {
241 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
242 self.io.get_ref().fmt(f)
243 }
244}
245
246impl AsRawFd for UnixStream {
247 fn as_raw_fd(&self) -> RawFd {
248 self.io.get_ref().as_raw_fd()
249 }
250}
251
252impl Future for ConnectFuture {
253 type Item = UnixStream;
254 type Error = io::Error;
255
256 fn poll(&mut self) -> Poll<UnixStream, io::Error> {
257 use std::mem;
258
259 match self.inner {
260 State::Waiting(ref mut stream) => {
261 if let Async::NotReady = stream.io.poll_write_ready()? {
262 return Ok(Async::NotReady);
263 }
264
265 if let Some(e) = stream.io.get_ref().take_error()? {
266 return Err(e);
267 }
268 }
269 State::Error(_) => {
270 let e = match mem::replace(&mut self.inner, State::Empty) {
271 State::Error(e) => e,
272 _ => unreachable!(),
273 };
274
275 return Err(e);
276 }
277 State::Empty => panic!("can't poll stream twice"),
278 }
279
280 match mem::replace(&mut self.inner, State::Empty) {
281 State::Waiting(stream) => Ok(Async::Ready(stream)),
282 _ => unreachable!(),
283 }
284 }
285}
286
287unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize {
288 let b1: &mut [u8] = &mut [0];
292 let b2: &mut [u8] = &mut [0];
293 let b3: &mut [u8] = &mut [0];
294 let b4: &mut [u8] = &mut [0];
295 let b5: &mut [u8] = &mut [0];
296 let b6: &mut [u8] = &mut [0];
297 let b7: &mut [u8] = &mut [0];
298 let b8: &mut [u8] = &mut [0];
299 let b9: &mut [u8] = &mut [0];
300 let b10: &mut [u8] = &mut [0];
301 let b11: &mut [u8] = &mut [0];
302 let b12: &mut [u8] = &mut [0];
303 let b13: &mut [u8] = &mut [0];
304 let b14: &mut [u8] = &mut [0];
305 let b15: &mut [u8] = &mut [0];
306 let b16: &mut [u8] = &mut [0];
307 let mut bufs: [&mut IoVec; 16] = [
308 b1.into(),
309 b2.into(),
310 b3.into(),
311 b4.into(),
312 b5.into(),
313 b6.into(),
314 b7.into(),
315 b8.into(),
316 b9.into(),
317 b10.into(),
318 b11.into(),
319 b12.into(),
320 b13.into(),
321 b14.into(),
322 b15.into(),
323 b16.into(),
324 ];
325
326 let n = buf.bytes_vec_mut(&mut bufs);
327 read_ready_vecs(&mut bufs[..n], raw_fd)
328}
329
330unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize {
331 let iovecs = iovec::unix::as_os_slice_mut(bufs);
332
333 libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
334}
335
336unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize {
337 static DUMMY: &[u8] = &[0];
341 let iovec = <&IoVec>::from(DUMMY);
342 let mut bufs = [
343 iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec,
344 iovec, iovec, iovec,
345 ];
346
347 let n = buf.bytes_vec(&mut bufs);
348 write_ready_vecs(&bufs[..n], raw_fd)
349}
350
351unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize {
352 let iovecs = iovec::unix::as_os_slice(bufs);
353
354 libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
355}