madsim_real_tokio/net/unix/
stream.rs

1use crate::future::poll_fn;
2use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
3use crate::net::unix::split::{split, ReadHalf, WriteHalf};
4use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
5use crate::net::unix::ucred::{self, UCred};
6use crate::net::unix::SocketAddr;
7
8use std::fmt;
9use std::io::{self, Read, Write};
10use std::net::Shutdown;
11use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
12use std::os::unix::net;
13use std::path::Path;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17cfg_io_util! {
18    use bytes::BufMut;
19}
20
21cfg_net_unix! {
22    /// A structure representing a connected Unix socket.
23    ///
24    /// This socket can be connected directly with [`UnixStream::connect`] or accepted
25    /// from a listener with [`UnixListener::accept`]. Additionally, a pair of
26    /// anonymous Unix sockets can be created with `UnixStream::pair`.
27    ///
28    /// To shut down the stream in the write direction, you can call the
29    /// [`shutdown()`] method. This will cause the other peer to receive a read of
30    /// length 0, indicating that no more data will be sent. This only closes
31    /// the stream in one direction.
32    ///
33    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
34    /// [`UnixListener::accept`]: crate::net::UnixListener::accept
35    #[cfg_attr(docsrs, doc(alias = "uds"))]
36    pub struct UnixStream {
37        io: PollEvented<mio::net::UnixStream>,
38    }
39}
40
41impl UnixStream {
42    pub(crate) async fn connect_mio(sys: mio::net::UnixStream) -> io::Result<UnixStream> {
43        let stream = UnixStream::new(sys)?;
44
45        // Once we've connected, wait for the stream to be writable as
46        // that's when the actual connection has been initiated. Once we're
47        // writable we check for `take_socket_error` to see if the connect
48        // actually hit an error or not.
49        //
50        // If all that succeeded then we ship everything on up.
51        poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
52
53        if let Some(e) = stream.io.take_error()? {
54            return Err(e);
55        }
56
57        Ok(stream)
58    }
59
60    /// Connects to the socket named by `path`.
61    ///
62    /// This function will create a new Unix socket and connect to the path
63    /// specified, associating the returned stream with the default event loop's
64    /// handle.
65    pub async fn connect<P>(path: P) -> io::Result<UnixStream>
66    where
67        P: AsRef<Path>,
68    {
69        let stream = mio::net::UnixStream::connect(path)?;
70        let stream = UnixStream::new(stream)?;
71
72        poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
73
74        if let Some(e) = stream.io.take_error()? {
75            return Err(e);
76        }
77
78        Ok(stream)
79    }
80
81    /// Waits for any of the requested ready states.
82    ///
83    /// This function is usually paired with `try_read()` or `try_write()`. It
84    /// can be used to concurrently read / write to the same socket on a single
85    /// task without splitting the socket.
86    ///
87    /// The function may complete without the socket being ready. This is a
88    /// false-positive and attempting an operation will return with
89    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
90    /// [`Ready`] set, so you should always check the returned value and possibly
91    /// wait again if the requested states are not set.
92    ///
93    /// # Cancel safety
94    ///
95    /// This method is cancel safe. Once a readiness event occurs, the method
96    /// will continue to return immediately until the readiness event is
97    /// consumed by an attempt to read or write that fails with `WouldBlock` or
98    /// `Poll::Pending`.
99    ///
100    /// # Examples
101    ///
102    /// Concurrently read and write to the stream on the same task without
103    /// splitting.
104    ///
105    /// ```no_run
106    /// use tokio::io::Interest;
107    /// use tokio::net::UnixStream;
108    /// use std::error::Error;
109    /// use std::io;
110    ///
111    /// #[tokio::main]
112    /// async fn main() -> Result<(), Box<dyn Error>> {
113    ///     let dir = tempfile::tempdir().unwrap();
114    ///     let bind_path = dir.path().join("bind_path");
115    ///     let stream = UnixStream::connect(bind_path).await?;
116    ///
117    ///     loop {
118    ///         let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
119    ///
120    ///         if ready.is_readable() {
121    ///             let mut data = vec![0; 1024];
122    ///             // Try to read data, this may still fail with `WouldBlock`
123    ///             // if the readiness event is a false positive.
124    ///             match stream.try_read(&mut data) {
125    ///                 Ok(n) => {
126    ///                     println!("read {} bytes", n);        
127    ///                 }
128    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
129    ///                     continue;
130    ///                 }
131    ///                 Err(e) => {
132    ///                     return Err(e.into());
133    ///                 }
134    ///             }
135    ///
136    ///         }
137    ///
138    ///         if ready.is_writable() {
139    ///             // Try to write data, this may still fail with `WouldBlock`
140    ///             // if the readiness event is a false positive.
141    ///             match stream.try_write(b"hello world") {
142    ///                 Ok(n) => {
143    ///                     println!("write {} bytes", n);
144    ///                 }
145    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
146    ///                     continue;
147    ///                 }
148    ///                 Err(e) => {
149    ///                     return Err(e.into());
150    ///                 }
151    ///             }
152    ///         }
153    ///     }
154    /// }
155    /// ```
156    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
157        let event = self.io.registration().readiness(interest).await?;
158        Ok(event.ready)
159    }
160
161    /// Waits for the socket to become readable.
162    ///
163    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
164    /// paired with `try_read()`.
165    ///
166    /// # Cancel safety
167    ///
168    /// This method is cancel safe. Once a readiness event occurs, the method
169    /// will continue to return immediately until the readiness event is
170    /// consumed by an attempt to read that fails with `WouldBlock` or
171    /// `Poll::Pending`.
172    ///
173    /// # Examples
174    ///
175    /// ```no_run
176    /// use tokio::net::UnixStream;
177    /// use std::error::Error;
178    /// use std::io;
179    ///
180    /// #[tokio::main]
181    /// async fn main() -> Result<(), Box<dyn Error>> {
182    ///     // Connect to a peer
183    ///     let dir = tempfile::tempdir().unwrap();
184    ///     let bind_path = dir.path().join("bind_path");
185    ///     let stream = UnixStream::connect(bind_path).await?;
186    ///
187    ///     let mut msg = vec![0; 1024];
188    ///
189    ///     loop {
190    ///         // Wait for the socket to be readable
191    ///         stream.readable().await?;
192    ///
193    ///         // Try to read data, this may still fail with `WouldBlock`
194    ///         // if the readiness event is a false positive.
195    ///         match stream.try_read(&mut msg) {
196    ///             Ok(n) => {
197    ///                 msg.truncate(n);
198    ///                 break;
199    ///             }
200    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
201    ///                 continue;
202    ///             }
203    ///             Err(e) => {
204    ///                 return Err(e.into());
205    ///             }
206    ///         }
207    ///     }
208    ///
209    ///     println!("GOT = {:?}", msg);
210    ///     Ok(())
211    /// }
212    /// ```
213    pub async fn readable(&self) -> io::Result<()> {
214        self.ready(Interest::READABLE).await?;
215        Ok(())
216    }
217
218    /// Polls for read readiness.
219    ///
220    /// If the unix stream is not currently ready for reading, this method will
221    /// store a clone of the `Waker` from the provided `Context`. When the unix
222    /// stream becomes ready for reading, `Waker::wake` will be called on the
223    /// waker.
224    ///
225    /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
226    /// the `Waker` from the `Context` passed to the most recent call is
227    /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
228    /// second, independent waker.)
229    ///
230    /// This function is intended for cases where creating and pinning a future
231    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
232    /// preferred, as this supports polling from multiple tasks at once.
233    ///
234    /// # Return value
235    ///
236    /// The function returns:
237    ///
238    /// * `Poll::Pending` if the unix stream is not ready for reading.
239    /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading.
240    /// * `Poll::Ready(Err(e))` if an error is encountered.
241    ///
242    /// # Errors
243    ///
244    /// This function may encounter any standard I/O error except `WouldBlock`.
245    ///
246    /// [`readable`]: method@Self::readable
247    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
248        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
249    }
250
251    /// Try to read data from the stream into the provided buffer, returning how
252    /// many bytes were read.
253    ///
254    /// Receives any pending data from the socket but does not wait for new data
255    /// to arrive. On success, returns the number of bytes read. Because
256    /// `try_read()` is non-blocking, the buffer does not have to be stored by
257    /// the async task and can exist entirely on the stack.
258    ///
259    /// Usually, [`readable()`] or [`ready()`] is used with this function.
260    ///
261    /// [`readable()`]: UnixStream::readable()
262    /// [`ready()`]: UnixStream::ready()
263    ///
264    /// # Return
265    ///
266    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
267    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
268    ///
269    /// 1. The stream's read half is closed and will no longer yield data.
270    /// 2. The specified buffer was 0 bytes in length.
271    ///
272    /// If the stream is not ready to read data,
273    /// `Err(io::ErrorKind::WouldBlock)` is returned.
274    ///
275    /// # Examples
276    ///
277    /// ```no_run
278    /// use tokio::net::UnixStream;
279    /// use std::error::Error;
280    /// use std::io;
281    ///
282    /// #[tokio::main]
283    /// async fn main() -> Result<(), Box<dyn Error>> {
284    ///     // Connect to a peer
285    ///     let dir = tempfile::tempdir().unwrap();
286    ///     let bind_path = dir.path().join("bind_path");
287    ///     let stream = UnixStream::connect(bind_path).await?;
288    ///
289    ///     loop {
290    ///         // Wait for the socket to be readable
291    ///         stream.readable().await?;
292    ///
293    ///         // Creating the buffer **after** the `await` prevents it from
294    ///         // being stored in the async task.
295    ///         let mut buf = [0; 4096];
296    ///
297    ///         // Try to read data, this may still fail with `WouldBlock`
298    ///         // if the readiness event is a false positive.
299    ///         match stream.try_read(&mut buf) {
300    ///             Ok(0) => break,
301    ///             Ok(n) => {
302    ///                 println!("read {} bytes", n);
303    ///             }
304    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
305    ///                 continue;
306    ///             }
307    ///             Err(e) => {
308    ///                 return Err(e.into());
309    ///             }
310    ///         }
311    ///     }
312    ///
313    ///     Ok(())
314    /// }
315    /// ```
316    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
317        self.io
318            .registration()
319            .try_io(Interest::READABLE, || (&*self.io).read(buf))
320    }
321
322    /// Tries to read data from the stream into the provided buffers, returning
323    /// how many bytes were read.
324    ///
325    /// Data is copied to fill each buffer in order, with the final buffer
326    /// written to possibly being only partially filled. This method behaves
327    /// equivalently to a single call to [`try_read()`] with concatenated
328    /// buffers.
329    ///
330    /// Receives any pending data from the socket but does not wait for new data
331    /// to arrive. On success, returns the number of bytes read. Because
332    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
333    /// stored by the async task and can exist entirely on the stack.
334    ///
335    /// Usually, [`readable()`] or [`ready()`] is used with this function.
336    ///
337    /// [`try_read()`]: UnixStream::try_read()
338    /// [`readable()`]: UnixStream::readable()
339    /// [`ready()`]: UnixStream::ready()
340    ///
341    /// # Return
342    ///
343    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
344    /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
345    /// and will no longer yield data. If the stream is not ready to read data
346    /// `Err(io::ErrorKind::WouldBlock)` is returned.
347    ///
348    /// # Examples
349    ///
350    /// ```no_run
351    /// use tokio::net::UnixStream;
352    /// use std::error::Error;
353    /// use std::io::{self, IoSliceMut};
354    ///
355    /// #[tokio::main]
356    /// async fn main() -> Result<(), Box<dyn Error>> {
357    ///     // Connect to a peer
358    ///     let dir = tempfile::tempdir().unwrap();
359    ///     let bind_path = dir.path().join("bind_path");
360    ///     let stream = UnixStream::connect(bind_path).await?;
361    ///
362    ///     loop {
363    ///         // Wait for the socket to be readable
364    ///         stream.readable().await?;
365    ///
366    ///         // Creating the buffer **after** the `await` prevents it from
367    ///         // being stored in the async task.
368    ///         let mut buf_a = [0; 512];
369    ///         let mut buf_b = [0; 1024];
370    ///         let mut bufs = [
371    ///             IoSliceMut::new(&mut buf_a),
372    ///             IoSliceMut::new(&mut buf_b),
373    ///         ];
374    ///
375    ///         // Try to read data, this may still fail with `WouldBlock`
376    ///         // if the readiness event is a false positive.
377    ///         match stream.try_read_vectored(&mut bufs) {
378    ///             Ok(0) => break,
379    ///             Ok(n) => {
380    ///                 println!("read {} bytes", n);
381    ///             }
382    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
383    ///                 continue;
384    ///             }
385    ///             Err(e) => {
386    ///                 return Err(e.into());
387    ///             }
388    ///         }
389    ///     }
390    ///
391    ///     Ok(())
392    /// }
393    /// ```
394    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
395        self.io
396            .registration()
397            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
398    }
399
400    cfg_io_util! {
401        /// Tries to read data from the stream into the provided buffer, advancing the
402        /// buffer's internal cursor, returning how many bytes were read.
403        ///
404        /// Receives any pending data from the socket but does not wait for new data
405        /// to arrive. On success, returns the number of bytes read. Because
406        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
407        /// the async task and can exist entirely on the stack.
408        ///
409        /// Usually, [`readable()`] or [`ready()`] is used with this function.
410        ///
411        /// [`readable()`]: UnixStream::readable()
412        /// [`ready()`]: UnixStream::ready()
413        ///
414        /// # Return
415        ///
416        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
417        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
418        /// and will no longer yield data. If the stream is not ready to read data
419        /// `Err(io::ErrorKind::WouldBlock)` is returned.
420        ///
421        /// # Examples
422        ///
423        /// ```no_run
424        /// use tokio::net::UnixStream;
425        /// use std::error::Error;
426        /// use std::io;
427        ///
428        /// #[tokio::main]
429        /// async fn main() -> Result<(), Box<dyn Error>> {
430        ///     // Connect to a peer
431        ///     let dir = tempfile::tempdir().unwrap();
432        ///     let bind_path = dir.path().join("bind_path");
433        ///     let stream = UnixStream::connect(bind_path).await?;
434        ///
435        ///     loop {
436        ///         // Wait for the socket to be readable
437        ///         stream.readable().await?;
438        ///
439        ///         let mut buf = Vec::with_capacity(4096);
440        ///
441        ///         // Try to read data, this may still fail with `WouldBlock`
442        ///         // if the readiness event is a false positive.
443        ///         match stream.try_read_buf(&mut buf) {
444        ///             Ok(0) => break,
445        ///             Ok(n) => {
446        ///                 println!("read {} bytes", n);
447        ///             }
448        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
449        ///                 continue;
450        ///             }
451        ///             Err(e) => {
452        ///                 return Err(e.into());
453        ///             }
454        ///         }
455        ///     }
456        ///
457        ///     Ok(())
458        /// }
459        /// ```
460        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
461            self.io.registration().try_io(Interest::READABLE, || {
462                use std::io::Read;
463
464                let dst = buf.chunk_mut();
465                let dst =
466                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
467
468                // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the
469                // buffer.
470                let n = (&*self.io).read(dst)?;
471
472                unsafe {
473                    buf.advance_mut(n);
474                }
475
476                Ok(n)
477            })
478        }
479    }
480
481    /// Waits for the socket to become writable.
482    ///
483    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
484    /// paired with `try_write()`.
485    ///
486    /// # Cancel safety
487    ///
488    /// This method is cancel safe. Once a readiness event occurs, the method
489    /// will continue to return immediately until the readiness event is
490    /// consumed by an attempt to write that fails with `WouldBlock` or
491    /// `Poll::Pending`.
492    ///
493    /// # Examples
494    ///
495    /// ```no_run
496    /// use tokio::net::UnixStream;
497    /// use std::error::Error;
498    /// use std::io;
499    ///
500    /// #[tokio::main]
501    /// async fn main() -> Result<(), Box<dyn Error>> {
502    ///     // Connect to a peer
503    ///     let dir = tempfile::tempdir().unwrap();
504    ///     let bind_path = dir.path().join("bind_path");
505    ///     let stream = UnixStream::connect(bind_path).await?;
506    ///
507    ///     loop {
508    ///         // Wait for the socket to be writable
509    ///         stream.writable().await?;
510    ///
511    ///         // Try to write data, this may still fail with `WouldBlock`
512    ///         // if the readiness event is a false positive.
513    ///         match stream.try_write(b"hello world") {
514    ///             Ok(n) => {
515    ///                 break;
516    ///             }
517    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
518    ///                 continue;
519    ///             }
520    ///             Err(e) => {
521    ///                 return Err(e.into());
522    ///             }
523    ///         }
524    ///     }
525    ///
526    ///     Ok(())
527    /// }
528    /// ```
529    pub async fn writable(&self) -> io::Result<()> {
530        self.ready(Interest::WRITABLE).await?;
531        Ok(())
532    }
533
534    /// Polls for write readiness.
535    ///
536    /// If the unix stream is not currently ready for writing, this method will
537    /// store a clone of the `Waker` from the provided `Context`. When the unix
538    /// stream becomes ready for writing, `Waker::wake` will be called on the
539    /// waker.
540    ///
541    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
542    /// the `Waker` from the `Context` passed to the most recent call is
543    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
544    /// second, independent waker.)
545    ///
546    /// This function is intended for cases where creating and pinning a future
547    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
548    /// preferred, as this supports polling from multiple tasks at once.
549    ///
550    /// # Return value
551    ///
552    /// The function returns:
553    ///
554    /// * `Poll::Pending` if the unix stream is not ready for writing.
555    /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing.
556    /// * `Poll::Ready(Err(e))` if an error is encountered.
557    ///
558    /// # Errors
559    ///
560    /// This function may encounter any standard I/O error except `WouldBlock`.
561    ///
562    /// [`writable`]: method@Self::writable
563    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
564        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
565    }
566
567    /// Tries to write a buffer to the stream, returning how many bytes were
568    /// written.
569    ///
570    /// The function will attempt to write the entire contents of `buf`, but
571    /// only part of the buffer may be written.
572    ///
573    /// This function is usually paired with `writable()`.
574    ///
575    /// # Return
576    ///
577    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
578    /// number of bytes written. If the stream is not ready to write data,
579    /// `Err(io::ErrorKind::WouldBlock)` is returned.
580    ///
581    /// # Examples
582    ///
583    /// ```no_run
584    /// use tokio::net::UnixStream;
585    /// use std::error::Error;
586    /// use std::io;
587    ///
588    /// #[tokio::main]
589    /// async fn main() -> Result<(), Box<dyn Error>> {
590    ///     // Connect to a peer
591    ///     let dir = tempfile::tempdir().unwrap();
592    ///     let bind_path = dir.path().join("bind_path");
593    ///     let stream = UnixStream::connect(bind_path).await?;
594    ///
595    ///     loop {
596    ///         // Wait for the socket to be writable
597    ///         stream.writable().await?;
598    ///
599    ///         // Try to write data, this may still fail with `WouldBlock`
600    ///         // if the readiness event is a false positive.
601    ///         match stream.try_write(b"hello world") {
602    ///             Ok(n) => {
603    ///                 break;
604    ///             }
605    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
606    ///                 continue;
607    ///             }
608    ///             Err(e) => {
609    ///                 return Err(e.into());
610    ///             }
611    ///         }
612    ///     }
613    ///
614    ///     Ok(())
615    /// }
616    /// ```
617    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
618        self.io
619            .registration()
620            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
621    }
622
623    /// Tries to write several buffers to the stream, returning how many bytes
624    /// were written.
625    ///
626    /// Data is written from each buffer in order, with the final buffer read
627    /// from possible being only partially consumed. This method behaves
628    /// equivalently to a single call to [`try_write()`] with concatenated
629    /// buffers.
630    ///
631    /// This function is usually paired with `writable()`.
632    ///
633    /// [`try_write()`]: UnixStream::try_write()
634    ///
635    /// # Return
636    ///
637    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
638    /// number of bytes written. If the stream is not ready to write data,
639    /// `Err(io::ErrorKind::WouldBlock)` is returned.
640    ///
641    /// # Examples
642    ///
643    /// ```no_run
644    /// use tokio::net::UnixStream;
645    /// use std::error::Error;
646    /// use std::io;
647    ///
648    /// #[tokio::main]
649    /// async fn main() -> Result<(), Box<dyn Error>> {
650    ///     // Connect to a peer
651    ///     let dir = tempfile::tempdir().unwrap();
652    ///     let bind_path = dir.path().join("bind_path");
653    ///     let stream = UnixStream::connect(bind_path).await?;
654    ///
655    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
656    ///
657    ///     loop {
658    ///         // Wait for the socket to be writable
659    ///         stream.writable().await?;
660    ///
661    ///         // Try to write data, this may still fail with `WouldBlock`
662    ///         // if the readiness event is a false positive.
663    ///         match stream.try_write_vectored(&bufs) {
664    ///             Ok(n) => {
665    ///                 break;
666    ///             }
667    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
668    ///                 continue;
669    ///             }
670    ///             Err(e) => {
671    ///                 return Err(e.into());
672    ///             }
673    ///         }
674    ///     }
675    ///
676    ///     Ok(())
677    /// }
678    /// ```
679    pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
680        self.io
681            .registration()
682            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
683    }
684
685    /// Tries to read or write from the socket using a user-provided IO operation.
686    ///
687    /// If the socket is ready, the provided closure is called. The closure
688    /// should attempt to perform IO operation on the socket by manually
689    /// calling the appropriate syscall. If the operation fails because the
690    /// socket is not actually ready, then the closure should return a
691    /// `WouldBlock` error and the readiness flag is cleared. The return value
692    /// of the closure is then returned by `try_io`.
693    ///
694    /// If the socket is not ready, then the closure is not called
695    /// and a `WouldBlock` error is returned.
696    ///
697    /// The closure should only return a `WouldBlock` error if it has performed
698    /// an IO operation on the socket that failed due to the socket not being
699    /// ready. Returning a `WouldBlock` error in any other situation will
700    /// incorrectly clear the readiness flag, which can cause the socket to
701    /// behave incorrectly.
702    ///
703    /// The closure should not perform the IO operation using any of the methods
704    /// defined on the Tokio `UnixStream` type, as this will mess with the
705    /// readiness flag and can cause the socket to behave incorrectly.
706    ///
707    /// This method is not intended to be used with combined interests.
708    /// The closure should perform only one type of IO operation, so it should not
709    /// require more than one ready state. This method may panic or sleep forever
710    /// if it is called with a combined interest.
711    ///
712    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
713    ///
714    /// [`readable()`]: UnixStream::readable()
715    /// [`writable()`]: UnixStream::writable()
716    /// [`ready()`]: UnixStream::ready()
717    pub fn try_io<R>(
718        &self,
719        interest: Interest,
720        f: impl FnOnce() -> io::Result<R>,
721    ) -> io::Result<R> {
722        self.io
723            .registration()
724            .try_io(interest, || self.io.try_io(f))
725    }
726
727    /// Reads or writes from the socket using a user-provided IO operation.
728    ///
729    /// The readiness of the socket is awaited and when the socket is ready,
730    /// the provided closure is called. The closure should attempt to perform
731    /// IO operation on the socket by manually calling the appropriate syscall.
732    /// If the operation fails because the socket is not actually ready,
733    /// then the closure should return a `WouldBlock` error. In such case the
734    /// readiness flag is cleared and the socket readiness is awaited again.
735    /// This loop is repeated until the closure returns an `Ok` or an error
736    /// other than `WouldBlock`.
737    ///
738    /// The closure should only return a `WouldBlock` error if it has performed
739    /// an IO operation on the socket that failed due to the socket not being
740    /// ready. Returning a `WouldBlock` error in any other situation will
741    /// incorrectly clear the readiness flag, which can cause the socket to
742    /// behave incorrectly.
743    ///
744    /// The closure should not perform the IO operation using any of the methods
745    /// defined on the Tokio `UnixStream` type, as this will mess with the
746    /// readiness flag and can cause the socket to behave incorrectly.
747    ///
748    /// This method is not intended to be used with combined interests.
749    /// The closure should perform only one type of IO operation, so it should not
750    /// require more than one ready state. This method may panic or sleep forever
751    /// if it is called with a combined interest.
752    pub async fn async_io<R>(
753        &self,
754        interest: Interest,
755        mut f: impl FnMut() -> io::Result<R>,
756    ) -> io::Result<R> {
757        self.io
758            .registration()
759            .async_io(interest, || self.io.try_io(&mut f))
760            .await
761    }
762
763    /// Creates new [`UnixStream`] from a [`std::os::unix::net::UnixStream`].
764    ///
765    /// This function is intended to be used to wrap a `UnixStream` from the
766    /// standard library in the Tokio equivalent.
767    ///
768    /// # Notes
769    ///
770    /// The caller is responsible for ensuring that the stream is in
771    /// non-blocking mode. Otherwise all I/O operations on the stream
772    /// will block the thread, which will cause unexpected behavior.
773    /// Non-blocking mode can be set using [`set_nonblocking`].
774    ///
775    /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking
776    ///
777    /// # Examples
778    ///
779    /// ```no_run
780    /// use tokio::net::UnixStream;
781    /// use std::os::unix::net::UnixStream as StdUnixStream;
782    /// # use std::error::Error;
783    ///
784    /// # async fn dox() -> Result<(), Box<dyn Error>> {
785    /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?;
786    /// std_stream.set_nonblocking(true)?;
787    /// let stream = UnixStream::from_std(std_stream)?;
788    /// # Ok(())
789    /// # }
790    /// ```
791    ///
792    /// # Panics
793    ///
794    /// This function panics if it is not called from within a runtime with
795    /// IO enabled.
796    ///
797    /// The runtime is usually set implicitly when this function is called
798    /// from a future driven by a tokio runtime, otherwise runtime can be set
799    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
800    #[track_caller]
801    pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
802        let stream = mio::net::UnixStream::from_std(stream);
803        let io = PollEvented::new(stream)?;
804
805        Ok(UnixStream { io })
806    }
807
808    /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
809    ///
810    /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking
811    /// mode set as `true`.  Use [`set_nonblocking`] to change the blocking
812    /// mode if needed.
813    ///
814    /// # Examples
815    ///
816    /// ```
817    /// use std::error::Error;
818    /// use std::io::Read;
819    /// use tokio::net::UnixListener;
820    /// # use tokio::net::UnixStream;
821    /// # use tokio::io::AsyncWriteExt;
822    ///
823    /// #[tokio::main]
824    /// async fn main() -> Result<(), Box<dyn Error>> {
825    ///     let dir = tempfile::tempdir().unwrap();
826    ///     let bind_path = dir.path().join("bind_path");
827    ///
828    ///     let mut data = [0u8; 12];
829    ///     let listener = UnixListener::bind(&bind_path)?;
830    /// #   let handle = tokio::spawn(async {
831    /// #       let mut stream = UnixStream::connect(bind_path).await.unwrap();
832    /// #       stream.write(b"Hello world!").await.unwrap();
833    /// #   });
834    ///     let (tokio_unix_stream, _) = listener.accept().await?;
835    ///     let mut std_unix_stream = tokio_unix_stream.into_std()?;
836    /// #   handle.await.expect("The task being joined has panicked");
837    ///     std_unix_stream.set_nonblocking(false)?;
838    ///     std_unix_stream.read_exact(&mut data)?;
839    /// #   assert_eq!(b"Hello world!", &data);
840    ///     Ok(())
841    /// }
842    /// ```
843    /// [`tokio::net::UnixStream`]: UnixStream
844    /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream
845    /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking
846    pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> {
847        self.io
848            .into_inner()
849            .map(IntoRawFd::into_raw_fd)
850            .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) })
851    }
852
853    /// Creates an unnamed pair of connected sockets.
854    ///
855    /// This function will create a pair of interconnected Unix sockets for
856    /// communicating back and forth between one another. Each socket will
857    /// be associated with the default event loop's handle.
858    pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
859        let (a, b) = mio::net::UnixStream::pair()?;
860        let a = UnixStream::new(a)?;
861        let b = UnixStream::new(b)?;
862
863        Ok((a, b))
864    }
865
866    pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> {
867        let io = PollEvented::new(stream)?;
868        Ok(UnixStream { io })
869    }
870
871    /// Returns the socket address of the local half of this connection.
872    ///
873    /// # Examples
874    ///
875    /// ```no_run
876    /// use tokio::net::UnixStream;
877    ///
878    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
879    /// let dir = tempfile::tempdir().unwrap();
880    /// let bind_path = dir.path().join("bind_path");
881    /// let stream = UnixStream::connect(bind_path).await?;
882    ///
883    /// println!("{:?}", stream.local_addr()?);
884    /// # Ok(())
885    /// # }
886    /// ```
887    pub fn local_addr(&self) -> io::Result<SocketAddr> {
888        self.io.local_addr().map(SocketAddr)
889    }
890
891    /// Returns the socket address of the remote half of this connection.
892    ///
893    /// # Examples
894    ///
895    /// ```no_run
896    /// use tokio::net::UnixStream;
897    ///
898    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
899    /// let dir = tempfile::tempdir().unwrap();
900    /// let bind_path = dir.path().join("bind_path");
901    /// let stream = UnixStream::connect(bind_path).await?;
902    ///
903    /// println!("{:?}", stream.peer_addr()?);
904    /// # Ok(())
905    /// # }
906    /// ```
907    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
908        self.io.peer_addr().map(SocketAddr)
909    }
910
911    /// Returns effective credentials of the process which called `connect` or `pair`.
912    pub fn peer_cred(&self) -> io::Result<UCred> {
913        ucred::get_peer_cred(self)
914    }
915
916    /// Returns the value of the `SO_ERROR` option.
917    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
918        self.io.take_error()
919    }
920
921    /// Shuts down the read, write, or both halves of this connection.
922    ///
923    /// This function will cause all pending and future I/O calls on the
924    /// specified portions to immediately return with an appropriate value
925    /// (see the documentation of `Shutdown`).
926    pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
927        self.io.shutdown(how)
928    }
929
930    // These lifetime markers also appear in the generated documentation, and make
931    // it more clear that this is a *borrowed* split.
932    #[allow(clippy::needless_lifetimes)]
933    /// Splits a `UnixStream` into a read half and a write half, which can be used
934    /// to read and write the stream concurrently.
935    ///
936    /// This method is more efficient than [`into_split`], but the halves cannot be
937    /// moved into independently spawned tasks.
938    ///
939    /// [`into_split`]: Self::into_split()
940    pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
941        split(self)
942    }
943
944    /// Splits a `UnixStream` into a read half and a write half, which can be used
945    /// to read and write the stream concurrently.
946    ///
947    /// Unlike [`split`], the owned halves can be moved to separate tasks, however
948    /// this comes at the cost of a heap allocation.
949    ///
950    /// **Note:** Dropping the write half will shut down the write half of the
951    /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`.
952    ///
953    /// [`split`]: Self::split()
954    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
955    pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
956        split_owned(self)
957    }
958}
959
960impl TryFrom<net::UnixStream> for UnixStream {
961    type Error = io::Error;
962
963    /// Consumes stream, returning the tokio I/O object.
964    ///
965    /// This is equivalent to
966    /// [`UnixStream::from_std(stream)`](UnixStream::from_std).
967    fn try_from(stream: net::UnixStream) -> io::Result<Self> {
968        Self::from_std(stream)
969    }
970}
971
972impl AsyncRead for UnixStream {
973    fn poll_read(
974        self: Pin<&mut Self>,
975        cx: &mut Context<'_>,
976        buf: &mut ReadBuf<'_>,
977    ) -> Poll<io::Result<()>> {
978        self.poll_read_priv(cx, buf)
979    }
980}
981
982impl AsyncWrite for UnixStream {
983    fn poll_write(
984        self: Pin<&mut Self>,
985        cx: &mut Context<'_>,
986        buf: &[u8],
987    ) -> Poll<io::Result<usize>> {
988        self.poll_write_priv(cx, buf)
989    }
990
991    fn poll_write_vectored(
992        self: Pin<&mut Self>,
993        cx: &mut Context<'_>,
994        bufs: &[io::IoSlice<'_>],
995    ) -> Poll<io::Result<usize>> {
996        self.poll_write_vectored_priv(cx, bufs)
997    }
998
999    fn is_write_vectored(&self) -> bool {
1000        true
1001    }
1002
1003    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1004        Poll::Ready(Ok(()))
1005    }
1006
1007    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1008        self.shutdown_std(std::net::Shutdown::Write)?;
1009        Poll::Ready(Ok(()))
1010    }
1011}
1012
1013impl UnixStream {
1014    // == Poll IO functions that takes `&self` ==
1015    //
1016    // To read or write without mutable access to the `UnixStream`, combine the
1017    // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1018    // `try_write` methods.
1019
1020    pub(crate) fn poll_read_priv(
1021        &self,
1022        cx: &mut Context<'_>,
1023        buf: &mut ReadBuf<'_>,
1024    ) -> Poll<io::Result<()>> {
1025        // Safety: `UnixStream::read` correctly handles reads into uninitialized memory
1026        unsafe { self.io.poll_read(cx, buf) }
1027    }
1028
1029    pub(crate) fn poll_write_priv(
1030        &self,
1031        cx: &mut Context<'_>,
1032        buf: &[u8],
1033    ) -> Poll<io::Result<usize>> {
1034        self.io.poll_write(cx, buf)
1035    }
1036
1037    pub(super) fn poll_write_vectored_priv(
1038        &self,
1039        cx: &mut Context<'_>,
1040        bufs: &[io::IoSlice<'_>],
1041    ) -> Poll<io::Result<usize>> {
1042        self.io.poll_write_vectored(cx, bufs)
1043    }
1044}
1045
1046impl fmt::Debug for UnixStream {
1047    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1048        self.io.fmt(f)
1049    }
1050}
1051
1052impl AsRawFd for UnixStream {
1053    fn as_raw_fd(&self) -> RawFd {
1054        self.io.as_raw_fd()
1055    }
1056}
1057
1058impl AsFd for UnixStream {
1059    fn as_fd(&self) -> BorrowedFd<'_> {
1060        unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1061    }
1062}