madsim_real_tokio/net/unix/
split_owned.rs

1//! `UnixStream` owned split support.
2//!
3//! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf`
4//! with the `UnixStream::into_split` method.  `OwnedReadHalf` implements
5//! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`.
6//!
7//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8//! split has no associated overhead and enforces all invariants at the type
9//! level.
10
11use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
12use crate::net::UnixStream;
13
14use crate::net::unix::SocketAddr;
15use std::error::Error;
16use std::net::Shutdown;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20use std::{fmt, io};
21
22cfg_io_util! {
23    use bytes::BufMut;
24}
25
26/// Owned read half of a [`UnixStream`], created by [`into_split`].
27///
28/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
29/// on the [`AsyncReadExt`] trait.
30///
31/// [`UnixStream`]: crate::net::UnixStream
32/// [`into_split`]: crate::net::UnixStream::into_split()
33/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
34#[derive(Debug)]
35pub struct OwnedReadHalf {
36    inner: Arc<UnixStream>,
37}
38
39/// Owned write half of a [`UnixStream`], created by [`into_split`].
40///
41/// Note that in the [`AsyncWrite`] implementation of this type,
42/// [`poll_shutdown`] will shut down the stream in the write direction.
43/// Dropping the write half will also shut down the write half of the stream.
44///
45/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods
46/// found on the [`AsyncWriteExt`] trait.
47///
48/// [`UnixStream`]: crate::net::UnixStream
49/// [`into_split`]: crate::net::UnixStream::into_split()
50/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
51/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
52/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
53#[derive(Debug)]
54pub struct OwnedWriteHalf {
55    inner: Arc<UnixStream>,
56    shutdown_on_drop: bool,
57}
58
59pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) {
60    let arc = Arc::new(stream);
61    let read = OwnedReadHalf {
62        inner: Arc::clone(&arc),
63    };
64    let write = OwnedWriteHalf {
65        inner: arc,
66        shutdown_on_drop: true,
67    };
68    (read, write)
69}
70
71pub(crate) fn reunite(
72    read: OwnedReadHalf,
73    write: OwnedWriteHalf,
74) -> Result<UnixStream, ReuniteError> {
75    if Arc::ptr_eq(&read.inner, &write.inner) {
76        write.forget();
77        // This unwrap cannot fail as the api does not allow creating more than two Arcs,
78        // and we just dropped the other half.
79        Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite"))
80    } else {
81        Err(ReuniteError(read, write))
82    }
83}
84
85/// Error indicating that two halves were not from the same socket, and thus could
86/// not be reunited.
87#[derive(Debug)]
88pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
89
90impl fmt::Display for ReuniteError {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        write!(
93            f,
94            "tried to reunite halves that are not from the same socket"
95        )
96    }
97}
98
99impl Error for ReuniteError {}
100
101impl OwnedReadHalf {
102    /// Attempts to put the two halves of a `UnixStream` back together and
103    /// recover the original socket. Succeeds only if the two halves
104    /// originated from the same call to [`into_split`].
105    ///
106    /// [`into_split`]: crate::net::UnixStream::into_split()
107    pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> {
108        reunite(self, other)
109    }
110
111    /// Waits for any of the requested ready states.
112    ///
113    /// This function is usually paired with [`try_read()`]. It can be used instead
114    /// of [`readable()`] to check the returned ready set for [`Ready::READABLE`]
115    /// and [`Ready::READ_CLOSED`] events.
116    ///
117    /// The function may complete without the socket being ready. This is a
118    /// false-positive and attempting an operation will return with
119    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
120    /// [`Ready`] set, so you should always check the returned value and possibly
121    /// wait again if the requested states are not set.
122    ///
123    /// This function is equivalent to [`UnixStream::ready`].
124    ///
125    /// [`try_read()`]: Self::try_read
126    /// [`readable()`]: Self::readable
127    ///
128    /// # Cancel safety
129    ///
130    /// This method is cancel safe. Once a readiness event occurs, the method
131    /// will continue to return immediately until the readiness event is
132    /// consumed by an attempt to read or write that fails with `WouldBlock` or
133    /// `Poll::Pending`.
134    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
135        self.inner.ready(interest).await
136    }
137
138    /// Waits for the socket to become readable.
139    ///
140    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
141    /// paired with `try_read()`.
142    ///
143    /// # Cancel safety
144    ///
145    /// This method is cancel safe. Once a readiness event occurs, the method
146    /// will continue to return immediately until the readiness event is
147    /// consumed by an attempt to read that fails with `WouldBlock` or
148    /// `Poll::Pending`.
149    pub async fn readable(&self) -> io::Result<()> {
150        self.inner.readable().await
151    }
152
153    /// Tries to read data from the stream into the provided buffer, returning how
154    /// many bytes were read.
155    ///
156    /// Receives any pending data from the socket but does not wait for new data
157    /// to arrive. On success, returns the number of bytes read. Because
158    /// `try_read()` is non-blocking, the buffer does not have to be stored by
159    /// the async task and can exist entirely on the stack.
160    ///
161    /// Usually, [`readable()`] or [`ready()`] is used with this function.
162    ///
163    /// [`readable()`]: Self::readable()
164    /// [`ready()`]: Self::ready()
165    ///
166    /// # Return
167    ///
168    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
169    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
170    ///
171    /// 1. The stream's read half is closed and will no longer yield data.
172    /// 2. The specified buffer was 0 bytes in length.
173    ///
174    /// If the stream is not ready to read data,
175    /// `Err(io::ErrorKind::WouldBlock)` is returned.
176    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
177        self.inner.try_read(buf)
178    }
179
180    cfg_io_util! {
181        /// Tries to read data from the stream into the provided buffer, advancing the
182        /// buffer's internal cursor, returning how many bytes were read.
183        ///
184        /// Receives any pending data from the socket but does not wait for new data
185        /// to arrive. On success, returns the number of bytes read. Because
186        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
187        /// the async task and can exist entirely on the stack.
188        ///
189        /// Usually, [`readable()`] or [`ready()`] is used with this function.
190        ///
191        /// [`readable()`]: Self::readable()
192        /// [`ready()`]: Self::ready()
193        ///
194        /// # Return
195        ///
196        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
197        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
198        /// and will no longer yield data. If the stream is not ready to read data
199        /// `Err(io::ErrorKind::WouldBlock)` is returned.
200        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
201            self.inner.try_read_buf(buf)
202        }
203    }
204
205    /// Tries to read data from the stream into the provided buffers, returning
206    /// how many bytes were read.
207    ///
208    /// Data is copied to fill each buffer in order, with the final buffer
209    /// written to possibly being only partially filled. This method behaves
210    /// equivalently to a single call to [`try_read()`] with concatenated
211    /// buffers.
212    ///
213    /// Receives any pending data from the socket but does not wait for new data
214    /// to arrive. On success, returns the number of bytes read. Because
215    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
216    /// stored by the async task and can exist entirely on the stack.
217    ///
218    /// Usually, [`readable()`] or [`ready()`] is used with this function.
219    ///
220    /// [`try_read()`]: Self::try_read()
221    /// [`readable()`]: Self::readable()
222    /// [`ready()`]: Self::ready()
223    ///
224    /// # Return
225    ///
226    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
227    /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
228    /// and will no longer yield data. If the stream is not ready to read data
229    /// `Err(io::ErrorKind::WouldBlock)` is returned.
230    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
231        self.inner.try_read_vectored(bufs)
232    }
233
234    /// Returns the socket address of the remote half of this connection.
235    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
236        self.inner.peer_addr()
237    }
238
239    /// Returns the socket address of the local half of this connection.
240    pub fn local_addr(&self) -> io::Result<SocketAddr> {
241        self.inner.local_addr()
242    }
243}
244
245impl AsyncRead for OwnedReadHalf {
246    fn poll_read(
247        self: Pin<&mut Self>,
248        cx: &mut Context<'_>,
249        buf: &mut ReadBuf<'_>,
250    ) -> Poll<io::Result<()>> {
251        self.inner.poll_read_priv(cx, buf)
252    }
253}
254
255impl OwnedWriteHalf {
256    /// Attempts to put the two halves of a `UnixStream` back together and
257    /// recover the original socket. Succeeds only if the two halves
258    /// originated from the same call to [`into_split`].
259    ///
260    /// [`into_split`]: crate::net::UnixStream::into_split()
261    pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> {
262        reunite(other, self)
263    }
264
265    /// Destroys the write half, but don't close the write half of the stream
266    /// until the read half is dropped. If the read half has already been
267    /// dropped, this closes the stream.
268    pub fn forget(mut self) {
269        self.shutdown_on_drop = false;
270        drop(self);
271    }
272
273    /// Waits for any of the requested ready states.
274    ///
275    /// This function is usually paired with [`try_write()`]. It can be used instead
276    /// of [`writable()`] to check the returned ready set for [`Ready::WRITABLE`]
277    /// and [`Ready::WRITE_CLOSED`] events.
278    ///
279    /// The function may complete without the socket being ready. This is a
280    /// false-positive and attempting an operation will return with
281    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
282    /// [`Ready`] set, so you should always check the returned value and possibly
283    /// wait again if the requested states are not set.
284    ///
285    /// This function is equivalent to [`UnixStream::ready`].
286    ///
287    /// [`try_write()`]: Self::try_write
288    /// [`writable()`]: Self::writable
289    ///
290    /// # Cancel safety
291    ///
292    /// This method is cancel safe. Once a readiness event occurs, the method
293    /// will continue to return immediately until the readiness event is
294    /// consumed by an attempt to read or write that fails with `WouldBlock` or
295    /// `Poll::Pending`.
296    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
297        self.inner.ready(interest).await
298    }
299
300    /// Waits for the socket to become writable.
301    ///
302    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
303    /// paired with `try_write()`.
304    ///
305    /// # Cancel safety
306    ///
307    /// This method is cancel safe. Once a readiness event occurs, the method
308    /// will continue to return immediately until the readiness event is
309    /// consumed by an attempt to write that fails with `WouldBlock` or
310    /// `Poll::Pending`.
311    pub async fn writable(&self) -> io::Result<()> {
312        self.inner.writable().await
313    }
314
315    /// Tries to write a buffer to the stream, returning how many bytes were
316    /// written.
317    ///
318    /// The function will attempt to write the entire contents of `buf`, but
319    /// only part of the buffer may be written.
320    ///
321    /// This function is usually paired with `writable()`.
322    ///
323    /// # Return
324    ///
325    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
326    /// number of bytes written. If the stream is not ready to write data,
327    /// `Err(io::ErrorKind::WouldBlock)` is returned.
328    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
329        self.inner.try_write(buf)
330    }
331
332    /// Tries to write several buffers to the stream, returning how many bytes
333    /// were written.
334    ///
335    /// Data is written from each buffer in order, with the final buffer read
336    /// from possible being only partially consumed. This method behaves
337    /// equivalently to a single call to [`try_write()`] with concatenated
338    /// buffers.
339    ///
340    /// This function is usually paired with `writable()`.
341    ///
342    /// [`try_write()`]: Self::try_write()
343    ///
344    /// # Return
345    ///
346    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
347    /// number of bytes written. If the stream is not ready to write data,
348    /// `Err(io::ErrorKind::WouldBlock)` is returned.
349    pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
350        self.inner.try_write_vectored(buf)
351    }
352
353    /// Returns the socket address of the remote half of this connection.
354    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
355        self.inner.peer_addr()
356    }
357
358    /// Returns the socket address of the local half of this connection.
359    pub fn local_addr(&self) -> io::Result<SocketAddr> {
360        self.inner.local_addr()
361    }
362}
363
364impl Drop for OwnedWriteHalf {
365    fn drop(&mut self) {
366        if self.shutdown_on_drop {
367            let _ = self.inner.shutdown_std(Shutdown::Write);
368        }
369    }
370}
371
372impl AsyncWrite for OwnedWriteHalf {
373    fn poll_write(
374        self: Pin<&mut Self>,
375        cx: &mut Context<'_>,
376        buf: &[u8],
377    ) -> Poll<io::Result<usize>> {
378        self.inner.poll_write_priv(cx, buf)
379    }
380
381    fn poll_write_vectored(
382        self: Pin<&mut Self>,
383        cx: &mut Context<'_>,
384        bufs: &[io::IoSlice<'_>],
385    ) -> Poll<io::Result<usize>> {
386        self.inner.poll_write_vectored_priv(cx, bufs)
387    }
388
389    fn is_write_vectored(&self) -> bool {
390        self.inner.is_write_vectored()
391    }
392
393    #[inline]
394    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
395        // flush is a no-op
396        Poll::Ready(Ok(()))
397    }
398
399    // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
400    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
401        let res = self.inner.shutdown_std(Shutdown::Write);
402        if res.is_ok() {
403            Pin::into_inner(self).shutdown_on_drop = false;
404        }
405        res.into()
406    }
407}
408
409impl AsRef<UnixStream> for OwnedReadHalf {
410    fn as_ref(&self) -> &UnixStream {
411        &self.inner
412    }
413}
414
415impl AsRef<UnixStream> for OwnedWriteHalf {
416    fn as_ref(&self) -> &UnixStream {
417        &self.inner
418    }
419}