async_std/net/tcp/listener.rs
1use std::fmt;
2use std::net::SocketAddr;
3use std::net::TcpStream as StdTcpStream;
4use std::pin::Pin;
5
6use async_io::Async;
7
8use crate::io;
9use crate::net::{TcpStream, ToSocketAddrs};
10use crate::stream::Stream;
11use crate::sync::Arc;
12use crate::task::{ready, Context, Poll};
13
14/// A TCP socket server, listening for connections.
15///
16/// After creating a `TcpListener` by [`bind`]ing it to a socket address, it listens for incoming
17/// TCP connections. These can be accepted by awaiting elements from the async stream of
18/// [`incoming`] connections.
19///
20/// The socket will be closed when the value is dropped.
21///
22/// The Transmission Control Protocol is specified in [IETF RFC 793].
23///
24/// This type is an async version of [`std::net::TcpListener`].
25///
26/// [`bind`]: #method.bind
27/// [`incoming`]: #method.incoming
28/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793
29/// [`std::net::TcpListener`]: https://doc.rust-lang.org/std/net/struct.TcpListener.html
30///
31/// # Examples
32///
33/// ```no_run
34/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
35/// #
36/// use async_std::io;
37/// use async_std::net::TcpListener;
38/// use async_std::prelude::*;
39///
40/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
41/// let mut incoming = listener.incoming();
42///
43/// while let Some(stream) = incoming.next().await {
44/// let stream = stream?;
45/// let (reader, writer) = &mut (&stream, &stream);
46/// io::copy(reader, writer).await?;
47/// }
48/// #
49/// # Ok(()) }) }
50/// ```
51#[derive(Debug)]
52pub struct TcpListener {
53 watcher: Async<std::net::TcpListener>,
54}
55
56impl TcpListener {
57 /// Creates a new `TcpListener` which will be bound to the specified address.
58 ///
59 /// The returned listener is ready for accepting connections.
60 ///
61 /// Binding with a port number of 0 will request that the OS assigns a port to this listener.
62 /// The port allocated can be queried via the [`local_addr`] method.
63 ///
64 /// # Examples
65 /// Create a TCP listener bound to 127.0.0.1:0:
66 ///
67 /// ```no_run
68 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
69 /// #
70 /// use async_std::net::TcpListener;
71 ///
72 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
73 /// #
74 /// # Ok(()) }) }
75 /// ```
76 ///
77 /// [`local_addr`]: #method.local_addr
78 pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> {
79 let mut last_err = None;
80 let addrs = addrs.to_socket_addrs().await?;
81
82 for addr in addrs {
83 match Async::<std::net::TcpListener>::bind(addr) {
84 Ok(listener) => {
85 return Ok(TcpListener { watcher: listener });
86 }
87 Err(err) => last_err = Some(err),
88 }
89 }
90
91 Err(last_err.unwrap_or_else(|| {
92 io::Error::new(
93 io::ErrorKind::InvalidInput,
94 "could not resolve to any addresses",
95 )
96 }))
97 }
98
99 /// Accepts a new incoming connection to this listener.
100 ///
101 /// When a connection is established, the corresponding stream and address will be returned.
102 ///
103 /// ## Examples
104 ///
105 /// ```no_run
106 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
107 /// #
108 /// use async_std::net::TcpListener;
109 ///
110 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
111 /// let (stream, addr) = listener.accept().await?;
112 /// #
113 /// # Ok(()) }) }
114 /// ```
115 pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
116 let (stream, addr) = self.watcher.accept().await?;
117 let stream = TcpStream {
118 watcher: Arc::new(stream),
119 };
120 Ok((stream, addr))
121 }
122
123 /// Returns a stream of incoming connections.
124 ///
125 /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of
126 /// connections is infinite, i.e awaiting the next connection will never result in [`None`].
127 ///
128 /// [`accept`]: #method.accept
129 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
130 ///
131 /// ## Examples
132 ///
133 /// ```no_run
134 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
135 /// #
136 /// use async_std::net::TcpListener;
137 /// use async_std::prelude::*;
138 ///
139 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
140 /// let mut incoming = listener.incoming();
141 ///
142 /// while let Some(stream) = incoming.next().await {
143 /// let mut stream = stream?;
144 /// stream.write_all(b"hello world").await?;
145 /// }
146 /// #
147 /// # Ok(()) }) }
148 /// ```
149 pub fn incoming(&self) -> Incoming<'_> {
150 Incoming {
151 incoming: Box::pin(self.watcher.incoming()),
152 }
153 }
154
155 /// Turn this into a stream over the connections being received on this
156 /// listener.
157 ///
158 /// The returned stream is infinite and will also not yield
159 /// the peer's [`SocketAddr`] structure. Iterating over it is equivalent to
160 /// calling [`TcpListener::accept`] in a loop.
161 ///
162 /// ## Examples
163 ///
164 /// Merge the incoming connections of multiple sockets into one [`Stream`]:
165 ///
166 /// ```no_run
167 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
168 /// #
169 /// use async_std::net::TcpListener;
170 ///
171 /// // Our server listens on multiple ports for some reason
172 /// let listeners = vec![
173 /// TcpListener::bind("[::0]:8080").await?,
174 /// TcpListener::bind("[::0]:12345").await?,
175 /// TcpListener::bind("[::0]:5678").await?,
176 /// ];
177 /// // Iterate over all incoming connections
178 /// let incoming = futures::stream::select_all(
179 /// listeners.into_iter()
180 /// .map(TcpListener::into_incoming)
181 /// .map(Box::pin)
182 /// );
183 /// #
184 /// # Ok(()) }) }
185 /// ```
186 #[cfg(feature = "unstable")]
187 pub fn into_incoming(self) -> impl Stream<Item = io::Result<TcpStream>> + Send {
188 futures_lite::stream::unfold(self, |listener| async move {
189 let res = listener.accept().await.map(|(stream, _)| stream);
190 Some((res, listener))
191 })
192 }
193
194 /// Returns the local address that this listener is bound to.
195 ///
196 /// This can be useful, for example, to identify when binding to port 0 which port was assigned
197 /// by the OS.
198 ///
199 /// # Examples
200 ///
201 /// ```no_run
202 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
203 /// #
204 /// use async_std::net::TcpListener;
205 ///
206 /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
207 /// let addr = listener.local_addr()?;
208 /// #
209 /// # Ok(()) }) }
210 /// ```
211 pub fn local_addr(&self) -> io::Result<SocketAddr> {
212 self.watcher.get_ref().local_addr()
213 }
214}
215
216/// A stream of incoming TCP connections.
217///
218/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
219/// created by the [`incoming`] method on [`TcpListener`].
220///
221/// This type is an async version of [`std::net::Incoming`].
222///
223/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
224/// [`incoming`]: struct.TcpListener.html#method.incoming
225/// [`TcpListener`]: struct.TcpListener.html
226/// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html
227pub struct Incoming<'a> {
228 incoming: Pin<Box<dyn Stream<Item = io::Result<Async<StdTcpStream>>> + Send + Sync + 'a>>,
229}
230
231impl Stream for Incoming<'_> {
232 type Item = io::Result<TcpStream>;
233
234 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
235 let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
236 Poll::Ready(res.map(|res| res.map(|stream| TcpStream { watcher: Arc::new(stream) })))
237 }
238}
239
240impl fmt::Debug for Incoming<'_> {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 write!(f, "Incoming {{ ... }}")
243 }
244}
245
246impl From<std::net::TcpListener> for TcpListener {
247 /// Converts a `std::net::TcpListener` into its asynchronous equivalent.
248 fn from(listener: std::net::TcpListener) -> TcpListener {
249 TcpListener {
250 watcher: Async::new(listener).expect("TcpListener is known to be good"),
251 }
252 }
253}
254
255impl std::convert::TryFrom<TcpListener> for std::net::TcpListener {
256 type Error = io::Error;
257 /// Converts a `TcpListener` into its synchronous equivalent.
258 fn try_from(listener: TcpListener) -> io::Result<std::net::TcpListener> {
259 let inner = listener.watcher.into_inner()?;
260 inner.set_nonblocking(false)?;
261 Ok(inner)
262 }
263}
264
265cfg_unix! {
266 use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
267
268 impl AsRawFd for TcpListener {
269 fn as_raw_fd(&self) -> RawFd {
270 self.watcher.get_ref().as_raw_fd()
271 }
272 }
273
274 impl FromRawFd for TcpListener {
275 unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
276 std::net::TcpListener::from_raw_fd(fd).into()
277 }
278 }
279
280 impl IntoRawFd for TcpListener {
281 fn into_raw_fd(self) -> RawFd {
282 self.watcher.into_inner().unwrap().into_raw_fd()
283 }
284 }
285
286 cfg_io_safety! {
287 use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
288
289 impl AsFd for TcpListener {
290 fn as_fd(&self) -> BorrowedFd<'_> {
291 self.watcher.get_ref().as_fd()
292 }
293 }
294
295 impl From<OwnedFd> for TcpListener {
296 fn from(fd: OwnedFd) -> TcpListener {
297 std::net::TcpListener::from(fd).into()
298 }
299 }
300
301 impl From<TcpListener> for OwnedFd {
302 fn from(listener: TcpListener) -> OwnedFd {
303 listener.watcher.into_inner().unwrap().into()
304 }
305 }
306 }
307}
308
309cfg_windows! {
310 use crate::os::windows::io::{
311 AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket,
312 };
313
314 impl AsRawSocket for TcpListener {
315 fn as_raw_socket(&self) -> RawSocket {
316 self.watcher.as_raw_socket()
317 }
318 }
319
320 impl FromRawSocket for TcpListener {
321 unsafe fn from_raw_socket(handle: RawSocket) -> TcpListener {
322 std::net::TcpListener::from_raw_socket(handle).into()
323 }
324 }
325
326 impl IntoRawSocket for TcpListener {
327 fn into_raw_socket(self) -> RawSocket {
328 self.watcher.into_inner().unwrap().into_raw_socket()
329 }
330 }
331
332 cfg_io_safety! {
333 use crate::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket};
334
335 impl AsSocket for TcpListener {
336 fn as_socket(&self) -> BorrowedSocket<'_> {
337 self.watcher.get_ref().as_socket()
338 }
339 }
340
341 impl From<OwnedSocket> for TcpListener {
342 fn from(fd: OwnedSocket) -> TcpListener {
343 std::net::TcpListener::from(fd).into()
344 }
345 }
346
347 impl From<TcpListener> for OwnedSocket {
348 fn from(listener: TcpListener) -> OwnedSocket {
349 listener.watcher.into_inner().unwrap().into()
350 }
351 }
352 }
353}