madsim_real_tokio/net/tcp/
split.rs

1//! `TcpStream` split support.
2//!
3//! A `TcpStream` can be split into a `ReadHalf` and a
4//! `WriteHalf` with the `TcpStream::split` method. `ReadHalf`
5//! implements `AsyncRead` while `WriteHalf` implements `AsyncWrite`.
6//!
7//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8//! split has no associated overhead and enforces all invariants at the type
9//! level.
10
11use crate::future::poll_fn;
12use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
13use crate::net::TcpStream;
14
15use std::io;
16use std::net::{Shutdown, SocketAddr};
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20cfg_io_util! {
21    use bytes::BufMut;
22}
23
24/// Borrowed read half of a [`TcpStream`], created by [`split`].
25///
26/// Reading from a `ReadHalf` is usually done using the convenience methods found on the
27/// [`AsyncReadExt`] trait.
28///
29/// [`TcpStream`]: TcpStream
30/// [`split`]: TcpStream::split()
31/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
32#[derive(Debug)]
33pub struct ReadHalf<'a>(&'a TcpStream);
34
35/// Borrowed write half of a [`TcpStream`], created by [`split`].
36///
37/// Note that in the [`AsyncWrite`] implementation of this type, [`poll_shutdown`] will
38/// shut down the TCP stream in the write direction.
39///
40/// Writing to an `WriteHalf` is usually done using the convenience methods found
41/// on the [`AsyncWriteExt`] trait.
42///
43/// [`TcpStream`]: TcpStream
44/// [`split`]: TcpStream::split()
45/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
46/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
47/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
48#[derive(Debug)]
49pub struct WriteHalf<'a>(&'a TcpStream);
50
51pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
52    (ReadHalf(&*stream), WriteHalf(&*stream))
53}
54
55impl ReadHalf<'_> {
56    /// Attempts to receive data on the socket, without removing that data from
57    /// the queue, registering the current task for wakeup if data is not yet
58    /// available.
59    ///
60    /// Note that on multiple calls to `poll_peek` or `poll_read`, only the
61    /// `Waker` from the `Context` passed to the most recent call is scheduled
62    /// to receive a wakeup.
63    ///
64    /// See the [`TcpStream::poll_peek`] level documentation for more details.
65    ///
66    /// # Examples
67    ///
68    /// ```no_run
69    /// use tokio::io::{self, ReadBuf};
70    /// use tokio::net::TcpStream;
71    ///
72    /// use futures::future::poll_fn;
73    ///
74    /// #[tokio::main]
75    /// async fn main() -> io::Result<()> {
76    ///     let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
77    ///     let (mut read_half, _) = stream.split();
78    ///     let mut buf = [0; 10];
79    ///     let mut buf = ReadBuf::new(&mut buf);
80    ///
81    ///     poll_fn(|cx| {
82    ///         read_half.poll_peek(cx, &mut buf)
83    ///     }).await?;
84    ///
85    ///     Ok(())
86    /// }
87    /// ```
88    ///
89    /// [`TcpStream::poll_peek`]: TcpStream::poll_peek
90    pub fn poll_peek(
91        &mut self,
92        cx: &mut Context<'_>,
93        buf: &mut ReadBuf<'_>,
94    ) -> Poll<io::Result<usize>> {
95        self.0.poll_peek(cx, buf)
96    }
97
98    /// Receives data on the socket from the remote address to which it is
99    /// connected, without removing that data from the queue. On success,
100    /// returns the number of bytes peeked.
101    ///
102    /// See the [`TcpStream::peek`] level documentation for more details.
103    ///
104    /// [`TcpStream::peek`]: TcpStream::peek
105    ///
106    /// # Examples
107    ///
108    /// ```no_run
109    /// use tokio::net::TcpStream;
110    /// use tokio::io::AsyncReadExt;
111    /// use std::error::Error;
112    ///
113    /// #[tokio::main]
114    /// async fn main() -> Result<(), Box<dyn Error>> {
115    ///     // Connect to a peer
116    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
117    ///     let (mut read_half, _) = stream.split();
118    ///
119    ///     let mut b1 = [0; 10];
120    ///     let mut b2 = [0; 10];
121    ///
122    ///     // Peek at the data
123    ///     let n = read_half.peek(&mut b1).await?;
124    ///
125    ///     // Read the data
126    ///     assert_eq!(n, read_half.read(&mut b2[..n]).await?);
127    ///     assert_eq!(&b1[..n], &b2[..n]);
128    ///
129    ///     Ok(())
130    /// }
131    /// ```
132    ///
133    /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
134    ///
135    /// [`read`]: fn@crate::io::AsyncReadExt::read
136    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
137    pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
138        let mut buf = ReadBuf::new(buf);
139        poll_fn(|cx| self.poll_peek(cx, &mut buf)).await
140    }
141
142    /// Waits for any of the requested ready states.
143    ///
144    /// This function is usually paired with [`try_read()`]. It can be used instead
145    /// of [`readable()`] to check the returned ready set for [`Ready::READABLE`]
146    /// and [`Ready::READ_CLOSED`] events.
147    ///
148    /// The function may complete without the socket being ready. This is a
149    /// false-positive and attempting an operation will return with
150    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
151    /// [`Ready`] set, so you should always check the returned value and possibly
152    /// wait again if the requested states are not set.
153    ///
154    /// This function is equivalent to [`TcpStream::ready`].
155    ///
156    /// [`try_read()`]: Self::try_read
157    /// [`readable()`]: Self::readable
158    ///
159    /// # Cancel safety
160    ///
161    /// This method is cancel safe. Once a readiness event occurs, the method
162    /// will continue to return immediately until the readiness event is
163    /// consumed by an attempt to read or write that fails with `WouldBlock` or
164    /// `Poll::Pending`.
165    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
166        self.0.ready(interest).await
167    }
168
169    /// Waits for the socket to become readable.
170    ///
171    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
172    /// paired with `try_read()`.
173    ///
174    /// This function is also equivalent to [`TcpStream::ready`].
175    ///
176    /// # Cancel safety
177    ///
178    /// This method is cancel safe. Once a readiness event occurs, the method
179    /// will continue to return immediately until the readiness event is
180    /// consumed by an attempt to read that fails with `WouldBlock` or
181    /// `Poll::Pending`.
182    pub async fn readable(&self) -> io::Result<()> {
183        self.0.readable().await
184    }
185
186    /// Tries to read data from the stream into the provided buffer, returning how
187    /// many bytes were read.
188    ///
189    /// Receives any pending data from the socket but does not wait for new data
190    /// to arrive. On success, returns the number of bytes read. Because
191    /// `try_read()` is non-blocking, the buffer does not have to be stored by
192    /// the async task and can exist entirely on the stack.
193    ///
194    /// Usually, [`readable()`] or [`ready()`] is used with this function.
195    ///
196    /// [`readable()`]: Self::readable()
197    /// [`ready()`]: Self::ready()
198    ///
199    /// # Return
200    ///
201    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
202    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
203    ///
204    /// 1. The stream's read half is closed and will no longer yield data.
205    /// 2. The specified buffer was 0 bytes in length.
206    ///
207    /// If the stream is not ready to read data,
208    /// `Err(io::ErrorKind::WouldBlock)` is returned.
209    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
210        self.0.try_read(buf)
211    }
212
213    /// Tries to read data from the stream into the provided buffers, returning
214    /// how many bytes were read.
215    ///
216    /// Data is copied to fill each buffer in order, with the final buffer
217    /// written to possibly being only partially filled. This method behaves
218    /// equivalently to a single call to [`try_read()`] with concatenated
219    /// buffers.
220    ///
221    /// Receives any pending data from the socket but does not wait for new data
222    /// to arrive. On success, returns the number of bytes read. Because
223    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
224    /// stored by the async task and can exist entirely on the stack.
225    ///
226    /// Usually, [`readable()`] or [`ready()`] is used with this function.
227    ///
228    /// [`try_read()`]: Self::try_read()
229    /// [`readable()`]: Self::readable()
230    /// [`ready()`]: Self::ready()
231    ///
232    /// # Return
233    ///
234    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
235    /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
236    /// and will no longer yield data. If the stream is not ready to read data
237    /// `Err(io::ErrorKind::WouldBlock)` is returned.
238    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
239        self.0.try_read_vectored(bufs)
240    }
241
242    cfg_io_util! {
243        /// Tries to read data from the stream into the provided buffer, advancing the
244        /// buffer's internal cursor, returning how many bytes were read.
245        ///
246        /// Receives any pending data from the socket but does not wait for new data
247        /// to arrive. On success, returns the number of bytes read. Because
248        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
249        /// the async task and can exist entirely on the stack.
250        ///
251        /// Usually, [`readable()`] or [`ready()`] is used with this function.
252        ///
253        /// [`readable()`]: Self::readable()
254        /// [`ready()`]: Self::ready()
255        ///
256        /// # Return
257        ///
258        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
259        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
260        /// and will no longer yield data. If the stream is not ready to read data
261        /// `Err(io::ErrorKind::WouldBlock)` is returned.
262        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
263            self.0.try_read_buf(buf)
264        }
265    }
266
267    /// Returns the remote address that this stream is connected to.
268    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
269        self.0.peer_addr()
270    }
271
272    /// Returns the local address that this stream is bound to.
273    pub fn local_addr(&self) -> io::Result<SocketAddr> {
274        self.0.local_addr()
275    }
276}
277
278impl WriteHalf<'_> {
279    /// Waits for any of the requested ready states.
280    ///
281    /// This function is usually paired with [`try_write()`]. It can be used instead
282    /// of [`writable()`] to check the returned ready set for [`Ready::WRITABLE`]
283    /// and [`Ready::WRITE_CLOSED`] events.
284    ///
285    /// The function may complete without the socket being ready. This is a
286    /// false-positive and attempting an operation will return with
287    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
288    /// [`Ready`] set, so you should always check the returned value and possibly
289    /// wait again if the requested states are not set.
290    ///
291    /// This function is equivalent to [`TcpStream::ready`].
292    ///
293    /// [`try_write()`]: Self::try_write
294    /// [`writable()`]: Self::writable
295    ///
296    /// # Cancel safety
297    ///
298    /// This method is cancel safe. Once a readiness event occurs, the method
299    /// will continue to return immediately until the readiness event is
300    /// consumed by an attempt to read or write that fails with `WouldBlock` or
301    /// `Poll::Pending`.
302    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
303        self.0.ready(interest).await
304    }
305
306    /// Waits for the socket to become writable.
307    ///
308    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
309    /// paired with `try_write()`.
310    ///
311    /// # Cancel safety
312    ///
313    /// This method is cancel safe. Once a readiness event occurs, the method
314    /// will continue to return immediately until the readiness event is
315    /// consumed by an attempt to write that fails with `WouldBlock` or
316    /// `Poll::Pending`.
317    pub async fn writable(&self) -> io::Result<()> {
318        self.0.writable().await
319    }
320
321    /// Tries to write a buffer to the stream, returning how many bytes were
322    /// written.
323    ///
324    /// The function will attempt to write the entire contents of `buf`, but
325    /// only part of the buffer may be written.
326    ///
327    /// This function is usually paired with `writable()`.
328    ///
329    /// # Return
330    ///
331    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
332    /// number of bytes written. If the stream is not ready to write data,
333    /// `Err(io::ErrorKind::WouldBlock)` is returned.
334    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
335        self.0.try_write(buf)
336    }
337
338    /// Tries to write several buffers to the stream, returning how many bytes
339    /// were written.
340    ///
341    /// Data is written from each buffer in order, with the final buffer read
342    /// from possible being only partially consumed. This method behaves
343    /// equivalently to a single call to [`try_write()`] with concatenated
344    /// buffers.
345    ///
346    /// This function is usually paired with `writable()`.
347    ///
348    /// [`try_write()`]: Self::try_write()
349    ///
350    /// # Return
351    ///
352    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
353    /// number of bytes written. If the stream is not ready to write data,
354    /// `Err(io::ErrorKind::WouldBlock)` is returned.
355    pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
356        self.0.try_write_vectored(bufs)
357    }
358
359    /// Returns the remote address that this stream is connected to.
360    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
361        self.0.peer_addr()
362    }
363
364    /// Returns the local address that this stream is bound to.
365    pub fn local_addr(&self) -> io::Result<SocketAddr> {
366        self.0.local_addr()
367    }
368}
369
370impl AsyncRead for ReadHalf<'_> {
371    fn poll_read(
372        self: Pin<&mut Self>,
373        cx: &mut Context<'_>,
374        buf: &mut ReadBuf<'_>,
375    ) -> Poll<io::Result<()>> {
376        self.0.poll_read_priv(cx, buf)
377    }
378}
379
380impl AsyncWrite for WriteHalf<'_> {
381    fn poll_write(
382        self: Pin<&mut Self>,
383        cx: &mut Context<'_>,
384        buf: &[u8],
385    ) -> Poll<io::Result<usize>> {
386        self.0.poll_write_priv(cx, buf)
387    }
388
389    fn poll_write_vectored(
390        self: Pin<&mut Self>,
391        cx: &mut Context<'_>,
392        bufs: &[io::IoSlice<'_>],
393    ) -> Poll<io::Result<usize>> {
394        self.0.poll_write_vectored_priv(cx, bufs)
395    }
396
397    fn is_write_vectored(&self) -> bool {
398        self.0.is_write_vectored()
399    }
400
401    #[inline]
402    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
403        // tcp flush is a no-op
404        Poll::Ready(Ok(()))
405    }
406
407    // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
408    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
409        self.0.shutdown_std(Shutdown::Write).into()
410    }
411}
412
413impl AsRef<TcpStream> for ReadHalf<'_> {
414    fn as_ref(&self) -> &TcpStream {
415        self.0
416    }
417}
418
419impl AsRef<TcpStream> for WriteHalf<'_> {
420    fn as_ref(&self) -> &TcpStream {
421        self.0
422    }
423}