async_std/net/tcp/
listener.rs

1use std::fmt;
2use std::net::SocketAddr;
3use std::net::TcpStream as StdTcpStream;
4use std::pin::Pin;
5
6use async_io::Async;
7
8use crate::io;
9use crate::net::{TcpStream, ToSocketAddrs};
10use crate::stream::Stream;
11use crate::sync::Arc;
12use crate::task::{ready, Context, Poll};
13
14/// A TCP socket server, listening for connections.
15///
16/// After creating a `TcpListener` by [`bind`]ing it to a socket address, it listens for incoming
17/// TCP connections. These can be accepted by awaiting elements from the async stream of
18/// [`incoming`] connections.
19///
20/// The socket will be closed when the value is dropped.
21///
22/// The Transmission Control Protocol is specified in [IETF RFC 793].
23///
24/// This type is an async version of [`std::net::TcpListener`].
25///
26/// [`bind`]: #method.bind
27/// [`incoming`]: #method.incoming
28/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793
29/// [`std::net::TcpListener`]: https://doc.rust-lang.org/std/net/struct.TcpListener.html
30///
31/// # Examples
32///
33/// ```no_run
34/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
35/// #
36/// use async_std::io;
37/// use async_std::net::TcpListener;
38/// use async_std::prelude::*;
39///
40/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
41/// let mut incoming = listener.incoming();
42///
43/// while let Some(stream) = incoming.next().await {
44///     let stream = stream?;
45///     let (reader, writer) = &mut (&stream, &stream);
46///     io::copy(reader, writer).await?;
47/// }
48/// #
49/// # Ok(()) }) }
50/// ```
51#[derive(Debug)]
52pub struct TcpListener {
53    watcher: Async<std::net::TcpListener>,
54}
55
56impl TcpListener {
57    /// Creates a new `TcpListener` which will be bound to the specified address.
58    ///
59    /// The returned listener is ready for accepting connections.
60    ///
61    /// Binding with a port number of 0 will request that the OS assigns a port to this listener.
62    /// The port allocated can be queried via the [`local_addr`] method.
63    ///
64    /// # Examples
65    /// Create a TCP listener bound to 127.0.0.1:0:
66    ///
67    /// ```no_run
68    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
69    /// #
70    /// use async_std::net::TcpListener;
71    ///
72    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
73    /// #
74    /// # Ok(()) }) }
75    /// ```
76    ///
77    /// [`local_addr`]: #method.local_addr
78    pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> {
79        let mut last_err = None;
80        let addrs = addrs.to_socket_addrs().await?;
81
82        for addr in addrs {
83            match Async::<std::net::TcpListener>::bind(addr) {
84                Ok(listener) => {
85                    return Ok(TcpListener { watcher: listener });
86                }
87                Err(err) => last_err = Some(err),
88            }
89        }
90
91        Err(last_err.unwrap_or_else(|| {
92            io::Error::new(
93                io::ErrorKind::InvalidInput,
94                "could not resolve to any addresses",
95            )
96        }))
97    }
98
99    /// Accepts a new incoming connection to this listener.
100    ///
101    /// When a connection is established, the corresponding stream and address will be returned.
102    ///
103    /// ## Examples
104    ///
105    /// ```no_run
106    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
107    /// #
108    /// use async_std::net::TcpListener;
109    ///
110    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
111    /// let (stream, addr) = listener.accept().await?;
112    /// #
113    /// # Ok(()) }) }
114    /// ```
115    pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
116        let (stream, addr) = self.watcher.accept().await?;
117        let stream = TcpStream {
118            watcher: Arc::new(stream),
119        };
120        Ok((stream, addr))
121    }
122
123    /// Returns a stream of incoming connections.
124    ///
125    /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of
126    /// connections is infinite, i.e awaiting the next connection will never result in [`None`].
127    ///
128    /// [`accept`]: #method.accept
129    /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
130    ///
131    /// ## Examples
132    ///
133    /// ```no_run
134    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
135    /// #
136    /// use async_std::net::TcpListener;
137    /// use async_std::prelude::*;
138    ///
139    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
140    /// let mut incoming = listener.incoming();
141    ///
142    /// while let Some(stream) = incoming.next().await {
143    ///     let mut stream = stream?;
144    ///     stream.write_all(b"hello world").await?;
145    /// }
146    /// #
147    /// # Ok(()) }) }
148    /// ```
149    pub fn incoming(&self) -> Incoming<'_> {
150        Incoming {
151            incoming: Box::pin(self.watcher.incoming()),
152        }
153    }    
154
155    /// Turn this into a stream over the connections being received on this
156    /// listener.
157    ///
158    /// The returned stream is infinite and will also not yield
159    /// the peer's [`SocketAddr`] structure. Iterating over it is equivalent to
160    /// calling [`TcpListener::accept`] in a loop.
161    ///
162    /// ## Examples
163    ///
164    /// Merge the incoming connections of multiple sockets into one [`Stream`]:
165    ///
166    /// ```no_run
167    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
168    /// #
169    /// use async_std::net::TcpListener;
170    ///
171    /// // Our server listens on multiple ports for some reason
172    /// let listeners = vec![
173    ///     TcpListener::bind("[::0]:8080").await?,
174    ///     TcpListener::bind("[::0]:12345").await?,
175    ///     TcpListener::bind("[::0]:5678").await?,
176    /// ];
177    /// // Iterate over all incoming connections
178    /// let incoming = futures::stream::select_all(
179    ///     listeners.into_iter()
180    ///         .map(TcpListener::into_incoming)
181    ///         .map(Box::pin)
182    /// );
183    /// #
184    /// # Ok(()) }) }
185    /// ```
186    #[cfg(feature = "unstable")]
187    pub fn into_incoming(self) -> impl Stream<Item = io::Result<TcpStream>> + Send {
188        futures_lite::stream::unfold(self, |listener| async move {
189            let res = listener.accept().await.map(|(stream, _)| stream);
190            Some((res, listener))
191        })
192    }
193
194    /// Returns the local address that this listener is bound to.
195    ///
196    /// This can be useful, for example, to identify when binding to port 0 which port was assigned
197    /// by the OS.
198    ///
199    /// # Examples
200    ///
201    /// ```no_run
202    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
203    /// #
204    /// use async_std::net::TcpListener;
205    ///
206    /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
207    /// let addr = listener.local_addr()?;
208    /// #
209    /// # Ok(()) }) }
210    /// ```
211    pub fn local_addr(&self) -> io::Result<SocketAddr> {
212        self.watcher.get_ref().local_addr()
213    }
214}
215
216/// A stream of incoming TCP connections.
217///
218/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
219/// created by the [`incoming`] method on [`TcpListener`].
220///
221/// This type is an async version of [`std::net::Incoming`].
222///
223/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
224/// [`incoming`]: struct.TcpListener.html#method.incoming
225/// [`TcpListener`]: struct.TcpListener.html
226/// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html
227pub struct Incoming<'a> {
228    incoming: Pin<Box<dyn Stream<Item = io::Result<Async<StdTcpStream>>> + Send + Sync + 'a>>,
229}
230
231impl Stream for Incoming<'_> {
232    type Item = io::Result<TcpStream>;
233
234    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
235        let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
236        Poll::Ready(res.map(|res| res.map(|stream| TcpStream { watcher: Arc::new(stream) })))
237    }
238}
239
240impl fmt::Debug for Incoming<'_> {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        write!(f, "Incoming {{ ... }}")
243    }
244}
245
246impl From<std::net::TcpListener> for TcpListener {
247    /// Converts a `std::net::TcpListener` into its asynchronous equivalent.
248    fn from(listener: std::net::TcpListener) -> TcpListener {
249        TcpListener {
250            watcher: Async::new(listener).expect("TcpListener is known to be good"),
251        }
252    }
253}
254
255impl std::convert::TryFrom<TcpListener> for std::net::TcpListener {
256    type Error = io::Error;
257    /// Converts a `TcpListener` into its synchronous equivalent.
258    fn try_from(listener: TcpListener) -> io::Result<std::net::TcpListener> {
259        let inner = listener.watcher.into_inner()?;
260        inner.set_nonblocking(false)?;
261        Ok(inner)
262    }
263}
264
265cfg_unix! {
266    use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
267
268    impl AsRawFd for TcpListener {
269        fn as_raw_fd(&self) -> RawFd {
270            self.watcher.get_ref().as_raw_fd()
271        }
272    }
273
274    impl FromRawFd for TcpListener {
275        unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
276            std::net::TcpListener::from_raw_fd(fd).into()
277        }
278    }
279
280    impl IntoRawFd for TcpListener {
281        fn into_raw_fd(self) -> RawFd {
282            self.watcher.into_inner().unwrap().into_raw_fd()
283        }
284    }
285
286    cfg_io_safety! {
287        use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
288
289        impl AsFd for TcpListener {
290            fn as_fd(&self) -> BorrowedFd<'_> {
291                self.watcher.get_ref().as_fd()
292            }
293        }
294
295        impl From<OwnedFd> for TcpListener {
296            fn from(fd: OwnedFd) -> TcpListener {
297                std::net::TcpListener::from(fd).into()
298            }
299        }
300
301        impl From<TcpListener> for OwnedFd {
302            fn from(listener: TcpListener) -> OwnedFd {
303                listener.watcher.into_inner().unwrap().into()
304            }
305        }
306    }
307}
308
309cfg_windows! {
310    use crate::os::windows::io::{
311        AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket,
312    };
313
314    impl AsRawSocket for TcpListener {
315        fn as_raw_socket(&self) -> RawSocket {
316            self.watcher.as_raw_socket()
317        }
318    }
319
320    impl FromRawSocket for TcpListener {
321        unsafe fn from_raw_socket(handle: RawSocket) -> TcpListener {
322            std::net::TcpListener::from_raw_socket(handle).into()
323        }
324    }
325
326    impl IntoRawSocket for TcpListener {
327        fn into_raw_socket(self) -> RawSocket {
328            self.watcher.into_inner().unwrap().into_raw_socket()
329        }
330    }
331
332    cfg_io_safety! {
333        use crate::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket};
334
335        impl AsSocket for TcpListener {
336            fn as_socket(&self) -> BorrowedSocket<'_> {
337                self.watcher.get_ref().as_socket()
338            }
339        }
340
341        impl From<OwnedSocket> for TcpListener {
342            fn from(fd: OwnedSocket) -> TcpListener {
343                std::net::TcpListener::from(fd).into()
344            }
345        }
346
347        impl From<TcpListener> for OwnedSocket {
348            fn from(listener: TcpListener) -> OwnedSocket {
349                listener.watcher.into_inner().unwrap().into()
350            }
351        }
352    }
353}