broker_tokio/net/tcp/
split.rs

1//! `TcpStream` split support.
2//!
3//! A `TcpStream` can be split into a `ReadHalf` and a
4//! `WriteHalf` with the `TcpStream::split` method. `ReadHalf`
5//! implements `AsyncRead` while `WriteHalf` implements `AsyncWrite`.
6//!
7//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8//! split has no associated overhead and enforces all invariants at the type
9//! level.
10
11use crate::future::poll_fn;
12use crate::io::{AsyncRead, AsyncWrite};
13use crate::net::TcpStream;
14
15use bytes::Buf;
16use std::io;
17use std::mem::MaybeUninit;
18use std::net::Shutdown;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21
22/// Read half of a `TcpStream`.
23#[derive(Debug)]
24pub struct ReadHalf<'a>(&'a TcpStream);
25
26/// Write half of a `TcpStream`.
27///
28/// Note that in the `AsyncWrite` implemenation of `TcpStreamWriteHalf`,
29/// `poll_shutdown` actually shuts down the TCP stream in the write direction.
30#[derive(Debug)]
31pub struct WriteHalf<'a>(&'a TcpStream);
32
33pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
34    (ReadHalf(&*stream), WriteHalf(&*stream))
35}
36
37impl ReadHalf<'_> {
38    /// Attempt to receive data on the socket, without removing that data from
39    /// the queue, registering the current task for wakeup if data is not yet
40    /// available.
41    ///
42    /// See the [`TcpStream::poll_peek`] level documenation for more details.
43    ///
44    /// # Examples
45    ///
46    /// ```no_run
47    /// use tokio::io;
48    /// use tokio::net::TcpStream;
49    ///
50    /// use futures::future::poll_fn;
51    ///
52    /// #[tokio::main]
53    /// async fn main() -> io::Result<()> {
54    ///     let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
55    ///     let (mut read_half, _) = stream.split();
56    ///     let mut buf = [0; 10];
57    ///
58    ///     poll_fn(|cx| {
59    ///         read_half.poll_peek(cx, &mut buf)
60    ///     }).await?;
61    ///
62    ///     Ok(())
63    /// }
64    /// ```
65    ///
66    /// [`TcpStream::poll_peek`]: TcpStream::poll_peek
67    pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
68        self.0.poll_peek2(cx, buf)
69    }
70
71    /// Receives data on the socket from the remote address to which it is
72    /// connected, without removing that data from the queue. On success,
73    /// returns the number of bytes peeked.
74    ///
75    /// See the [`TcpStream::peek`] level documenation for more details.
76    ///
77    /// # Examples
78    ///
79    /// ```no_run
80    /// use tokio::net::TcpStream;
81    /// use tokio::prelude::*;
82    /// use std::error::Error;
83    ///
84    /// #[tokio::main]
85    /// async fn main() -> Result<(), Box<dyn Error>> {
86    ///     // Connect to a peer
87    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
88    ///     let (mut read_half, _) = stream.split();
89    ///
90    ///     let mut b1 = [0; 10];
91    ///     let mut b2 = [0; 10];
92    ///
93    ///     // Peek at the data
94    ///     let n = read_half.peek(&mut b1).await?;
95    ///
96    ///     // Read the data
97    ///     assert_eq!(n, read_half.read(&mut b2[..n]).await?);
98    ///     assert_eq!(&b1[..n], &b2[..n]);
99    ///
100    ///     Ok(())
101    /// }
102    /// ```
103    ///
104    /// [`TcpStream::peek`]: TcpStream::peek
105    pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
106        poll_fn(|cx| self.poll_peek(cx, buf)).await
107    }
108}
109
110impl AsyncRead for ReadHalf<'_> {
111    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
112        false
113    }
114
115    fn poll_read(
116        self: Pin<&mut Self>,
117        cx: &mut Context<'_>,
118        buf: &mut [u8],
119    ) -> Poll<io::Result<usize>> {
120        self.0.poll_read_priv(cx, buf)
121    }
122}
123
124impl AsyncWrite for WriteHalf<'_> {
125    fn poll_write(
126        self: Pin<&mut Self>,
127        cx: &mut Context<'_>,
128        buf: &[u8],
129    ) -> Poll<io::Result<usize>> {
130        self.0.poll_write_priv(cx, buf)
131    }
132
133    fn poll_write_buf<B: Buf>(
134        self: Pin<&mut Self>,
135        cx: &mut Context<'_>,
136        buf: &mut B,
137    ) -> Poll<io::Result<usize>> {
138        self.0.poll_write_buf_priv(cx, buf)
139    }
140
141    #[inline]
142    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
143        // tcp flush is a no-op
144        Poll::Ready(Ok(()))
145    }
146
147    // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
148    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
149        self.0.shutdown(Shutdown::Write).into()
150    }
151}
152
153impl AsRef<TcpStream> for ReadHalf<'_> {
154    fn as_ref(&self) -> &TcpStream {
155        self.0
156    }
157}
158
159impl AsRef<TcpStream> for WriteHalf<'_> {
160    fn as_ref(&self) -> &TcpStream {
161        self.0
162    }
163}