broker_tokio/net/tcp/split.rs
1//! `TcpStream` split support.
2//!
3//! A `TcpStream` can be split into a `ReadHalf` and a
4//! `WriteHalf` with the `TcpStream::split` method. `ReadHalf`
5//! implements `AsyncRead` while `WriteHalf` implements `AsyncWrite`.
6//!
7//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8//! split has no associated overhead and enforces all invariants at the type
9//! level.
10
11use crate::future::poll_fn;
12use crate::io::{AsyncRead, AsyncWrite};
13use crate::net::TcpStream;
14
15use bytes::Buf;
16use std::io;
17use std::mem::MaybeUninit;
18use std::net::Shutdown;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21
22/// Read half of a `TcpStream`.
23#[derive(Debug)]
24pub struct ReadHalf<'a>(&'a TcpStream);
25
26/// Write half of a `TcpStream`.
27///
28/// Note that in the `AsyncWrite` implemenation of `TcpStreamWriteHalf`,
29/// `poll_shutdown` actually shuts down the TCP stream in the write direction.
30#[derive(Debug)]
31pub struct WriteHalf<'a>(&'a TcpStream);
32
33pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
34 (ReadHalf(&*stream), WriteHalf(&*stream))
35}
36
37impl ReadHalf<'_> {
38 /// Attempt to receive data on the socket, without removing that data from
39 /// the queue, registering the current task for wakeup if data is not yet
40 /// available.
41 ///
42 /// See the [`TcpStream::poll_peek`] level documenation for more details.
43 ///
44 /// # Examples
45 ///
46 /// ```no_run
47 /// use tokio::io;
48 /// use tokio::net::TcpStream;
49 ///
50 /// use futures::future::poll_fn;
51 ///
52 /// #[tokio::main]
53 /// async fn main() -> io::Result<()> {
54 /// let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
55 /// let (mut read_half, _) = stream.split();
56 /// let mut buf = [0; 10];
57 ///
58 /// poll_fn(|cx| {
59 /// read_half.poll_peek(cx, &mut buf)
60 /// }).await?;
61 ///
62 /// Ok(())
63 /// }
64 /// ```
65 ///
66 /// [`TcpStream::poll_peek`]: TcpStream::poll_peek
67 pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
68 self.0.poll_peek2(cx, buf)
69 }
70
71 /// Receives data on the socket from the remote address to which it is
72 /// connected, without removing that data from the queue. On success,
73 /// returns the number of bytes peeked.
74 ///
75 /// See the [`TcpStream::peek`] level documenation for more details.
76 ///
77 /// # Examples
78 ///
79 /// ```no_run
80 /// use tokio::net::TcpStream;
81 /// use tokio::prelude::*;
82 /// use std::error::Error;
83 ///
84 /// #[tokio::main]
85 /// async fn main() -> Result<(), Box<dyn Error>> {
86 /// // Connect to a peer
87 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
88 /// let (mut read_half, _) = stream.split();
89 ///
90 /// let mut b1 = [0; 10];
91 /// let mut b2 = [0; 10];
92 ///
93 /// // Peek at the data
94 /// let n = read_half.peek(&mut b1).await?;
95 ///
96 /// // Read the data
97 /// assert_eq!(n, read_half.read(&mut b2[..n]).await?);
98 /// assert_eq!(&b1[..n], &b2[..n]);
99 ///
100 /// Ok(())
101 /// }
102 /// ```
103 ///
104 /// [`TcpStream::peek`]: TcpStream::peek
105 pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
106 poll_fn(|cx| self.poll_peek(cx, buf)).await
107 }
108}
109
110impl AsyncRead for ReadHalf<'_> {
111 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
112 false
113 }
114
115 fn poll_read(
116 self: Pin<&mut Self>,
117 cx: &mut Context<'_>,
118 buf: &mut [u8],
119 ) -> Poll<io::Result<usize>> {
120 self.0.poll_read_priv(cx, buf)
121 }
122}
123
124impl AsyncWrite for WriteHalf<'_> {
125 fn poll_write(
126 self: Pin<&mut Self>,
127 cx: &mut Context<'_>,
128 buf: &[u8],
129 ) -> Poll<io::Result<usize>> {
130 self.0.poll_write_priv(cx, buf)
131 }
132
133 fn poll_write_buf<B: Buf>(
134 self: Pin<&mut Self>,
135 cx: &mut Context<'_>,
136 buf: &mut B,
137 ) -> Poll<io::Result<usize>> {
138 self.0.poll_write_buf_priv(cx, buf)
139 }
140
141 #[inline]
142 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
143 // tcp flush is a no-op
144 Poll::Ready(Ok(()))
145 }
146
147 // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
148 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
149 self.0.shutdown(Shutdown::Write).into()
150 }
151}
152
153impl AsRef<TcpStream> for ReadHalf<'_> {
154 fn as_ref(&self) -> &TcpStream {
155 self.0
156 }
157}
158
159impl AsRef<TcpStream> for WriteHalf<'_> {
160 fn as_ref(&self) -> &TcpStream {
161 self.0
162 }
163}