compio_net/tcp.rs
1use std::{future::Future, io, net::SocketAddr};
2
3use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4use compio_driver::impl_raw_fd;
5use compio_io::{AsyncRead, AsyncWrite};
6use socket2::{Protocol, SockAddr, Socket as Socket2, Type};
7
8use crate::{
9 OwnedReadHalf, OwnedWriteHalf, PollFd, ReadHalf, Socket, ToSocketAddrsAsync, WriteHalf,
10};
11
12/// A TCP socket server, listening for connections.
13///
14/// You can accept a new connection by using the
15/// [`accept`](`TcpListener::accept`) method.
16///
17/// # Examples
18///
19/// ```
20/// use std::net::SocketAddr;
21///
22/// use compio_io::{AsyncReadExt, AsyncWriteExt};
23/// use compio_net::{TcpListener, TcpStream};
24/// use socket2::SockAddr;
25///
26/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
27/// let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
28///
29/// let addr = listener.local_addr().unwrap();
30///
31/// let tx_fut = TcpStream::connect(&addr);
32///
33/// let rx_fut = listener.accept();
34///
35/// let (mut tx, (mut rx, _)) = futures_util::try_join!(tx_fut, rx_fut).unwrap();
36///
37/// tx.write_all("test").await.0.unwrap();
38///
39/// let (_, buf) = rx.read_exact(Vec::with_capacity(4)).await.unwrap();
40///
41/// assert_eq!(buf, b"test");
42/// # });
43/// ```
44#[derive(Debug, Clone)]
45pub struct TcpListener {
46 inner: Socket,
47}
48
49impl TcpListener {
50 /// Creates a new `TcpListener`, which will be bound to the specified
51 /// address.
52 ///
53 /// The returned listener is ready for accepting connections.
54 ///
55 /// Binding with a port number of 0 will request that the OS assigns a port
56 /// to this listener.
57 pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
58 super::each_addr(addr, |addr| async move {
59 let socket =
60 Socket::bind(&SockAddr::from(addr), Type::STREAM, Some(Protocol::TCP)).await?;
61 socket.listen(128)?;
62 Ok(Self { inner: socket })
63 })
64 .await
65 }
66
67 /// Close the socket. If the returned future is dropped before polling, the
68 /// socket won't be closed.
69 pub fn close(self) -> impl Future<Output = io::Result<()>> {
70 self.inner.close()
71 }
72
73 /// Accepts a new incoming connection from this listener.
74 ///
75 /// This function will yield once a new TCP connection is established. When
76 /// established, the corresponding [`TcpStream`] and the remote peer's
77 /// address will be returned.
78 pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
79 let (socket, addr) = self.inner.accept().await?;
80 let stream = TcpStream { inner: socket };
81 Ok((stream, addr.as_socket().expect("should be SocketAddr")))
82 }
83
84 /// Returns the local address that this listener is bound to.
85 ///
86 /// This can be useful, for example, when binding to port 0 to
87 /// figure out which port was actually bound.
88 ///
89 /// # Examples
90 ///
91 /// ```
92 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
93 ///
94 /// use compio_net::TcpListener;
95 /// use socket2::SockAddr;
96 ///
97 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
98 /// let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
99 ///
100 /// let addr = listener.local_addr().expect("Couldn't get local address");
101 /// assert_eq!(
102 /// addr,
103 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))
104 /// );
105 /// # });
106 /// ```
107 pub fn local_addr(&self) -> io::Result<SocketAddr> {
108 self.inner
109 .local_addr()
110 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
111 }
112}
113
114impl_raw_fd!(TcpListener, socket2::Socket, inner, socket);
115
116/// A TCP stream between a local and a remote socket.
117///
118/// A TCP stream can either be created by connecting to an endpoint, via the
119/// `connect` method, or by accepting a connection from a listener.
120///
121/// # Examples
122///
123/// ```no_run
124/// use std::net::SocketAddr;
125///
126/// use compio_io::AsyncWrite;
127/// use compio_net::TcpStream;
128///
129/// # compio_runtime::Runtime::new().unwrap().block_on(async {
130/// // Connect to a peer
131/// let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
132///
133/// // Write some data.
134/// stream.write("hello world!").await.unwrap();
135/// # })
136/// ```
137#[derive(Debug, Clone)]
138pub struct TcpStream {
139 inner: Socket,
140}
141
142impl TcpStream {
143 /// Opens a TCP connection to a remote host.
144 pub async fn connect(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
145 use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
146
147 super::each_addr(addr, |addr| async move {
148 let addr2 = SockAddr::from(addr);
149 let socket = if cfg!(windows) {
150 let bind_addr = if addr.is_ipv4() {
151 SockAddr::from(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
152 } else if addr.is_ipv6() {
153 SockAddr::from(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0))
154 } else {
155 return Err(io::Error::new(
156 io::ErrorKind::AddrNotAvailable,
157 "Unsupported address domain.",
158 ));
159 };
160 Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await?
161 } else {
162 Socket::new(addr2.domain(), Type::STREAM, Some(Protocol::TCP)).await?
163 };
164 socket.connect_async(&addr2).await?;
165 Ok(Self { inner: socket })
166 })
167 .await
168 }
169
170 /// Creates new TcpStream from a std::net::TcpStream.
171 pub fn from_std(stream: std::net::TcpStream) -> io::Result<Self> {
172 Ok(Self {
173 inner: Socket::from_socket2(Socket2::from(stream))?,
174 })
175 }
176
177 /// Close the socket. If the returned future is dropped before polling, the
178 /// socket won't be closed.
179 pub fn close(self) -> impl Future<Output = io::Result<()>> {
180 self.inner.close()
181 }
182
183 /// Returns the socket address of the remote peer of this TCP connection.
184 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
185 self.inner
186 .peer_addr()
187 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
188 }
189
190 /// Returns the socket address of the local half of this TCP connection.
191 pub fn local_addr(&self) -> io::Result<SocketAddr> {
192 self.inner
193 .local_addr()
194 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
195 }
196
197 /// Splits a [`TcpStream`] into a read half and a write half, which can be
198 /// used to read and write the stream concurrently.
199 ///
200 /// This method is more efficient than
201 /// [`into_split`](TcpStream::into_split), but the halves cannot
202 /// be moved into independently spawned tasks.
203 pub fn split(&self) -> (ReadHalf<Self>, WriteHalf<Self>) {
204 crate::split(self)
205 }
206
207 /// Splits a [`TcpStream`] into a read half and a write half, which can be
208 /// used to read and write the stream concurrently.
209 ///
210 /// Unlike [`split`](TcpStream::split), the owned halves can be moved to
211 /// separate tasks, however this comes at the cost of a heap allocation.
212 pub fn into_split(self) -> (OwnedReadHalf<Self>, OwnedWriteHalf<Self>) {
213 crate::into_split(self)
214 }
215
216 /// Create [`PollFd`] from inner socket.
217 pub fn to_poll_fd(&self) -> io::Result<PollFd<Socket2>> {
218 self.inner.to_poll_fd()
219 }
220
221 /// Create [`PollFd`] from inner socket.
222 pub fn into_poll_fd(self) -> io::Result<PollFd<Socket2>> {
223 self.inner.into_poll_fd()
224 }
225}
226
227impl AsyncRead for TcpStream {
228 #[inline]
229 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
230 (&*self).read(buf).await
231 }
232
233 #[inline]
234 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
235 (&*self).read_vectored(buf).await
236 }
237}
238
239impl AsyncRead for &TcpStream {
240 #[inline]
241 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
242 self.inner.recv(buf).await
243 }
244
245 #[inline]
246 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
247 self.inner.recv_vectored(buf).await
248 }
249}
250
251impl AsyncWrite for TcpStream {
252 #[inline]
253 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
254 (&*self).write(buf).await
255 }
256
257 #[inline]
258 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
259 (&*self).write_vectored(buf).await
260 }
261
262 #[inline]
263 async fn flush(&mut self) -> io::Result<()> {
264 (&*self).flush().await
265 }
266
267 #[inline]
268 async fn shutdown(&mut self) -> io::Result<()> {
269 (&*self).shutdown().await
270 }
271}
272
273impl AsyncWrite for &TcpStream {
274 #[inline]
275 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
276 self.inner.send(buf).await
277 }
278
279 #[inline]
280 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
281 self.inner.send_vectored(buf).await
282 }
283
284 #[inline]
285 async fn flush(&mut self) -> io::Result<()> {
286 Ok(())
287 }
288
289 #[inline]
290 async fn shutdown(&mut self) -> io::Result<()> {
291 self.inner.shutdown().await
292 }
293}
294
295impl_raw_fd!(TcpStream, socket2::Socket, inner, socket);