madsim_real_tokio/net/tcp/
stream.rs

1cfg_not_wasi! {
2    use crate::future::poll_fn;
3    use crate::net::{to_socket_addrs, ToSocketAddrs};
4    use std::time::Duration;
5}
6
7use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
8use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
9use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
10
11use std::fmt;
12use std::io;
13use std::net::{Shutdown, SocketAddr};
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17cfg_io_util! {
18    use bytes::BufMut;
19}
20
21cfg_net! {
22    /// A TCP stream between a local and a remote socket.
23    ///
24    /// A TCP stream can either be created by connecting to an endpoint, via the
25    /// [`connect`] method, or by [accepting] a connection from a [listener]. A
26    /// TCP stream can also be created via the [`TcpSocket`] type.
27    ///
28    /// Reading and writing to a `TcpStream` is usually done using the
29    /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
30    /// traits.
31    ///
32    /// [`connect`]: method@TcpStream::connect
33    /// [accepting]: method@crate::net::TcpListener::accept
34    /// [listener]: struct@crate::net::TcpListener
35    /// [`TcpSocket`]: struct@crate::net::TcpSocket
36    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
37    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
38    ///
39    /// # Examples
40    ///
41    /// ```no_run
42    /// use tokio::net::TcpStream;
43    /// use tokio::io::AsyncWriteExt;
44    /// use std::error::Error;
45    ///
46    /// #[tokio::main]
47    /// async fn main() -> Result<(), Box<dyn Error>> {
48    ///     // Connect to a peer
49    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
50    ///
51    ///     // Write some data.
52    ///     stream.write_all(b"hello world!").await?;
53    ///
54    ///     Ok(())
55    /// }
56    /// ```
57    ///
58    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
59    ///
60    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
61    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
62    ///
63    /// To shut down the stream in the write direction, you can call the
64    /// [`shutdown()`] method. This will cause the other peer to receive a read of
65    /// length 0, indicating that no more data will be sent. This only closes
66    /// the stream in one direction.
67    ///
68    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
69    pub struct TcpStream {
70        io: PollEvented<mio::net::TcpStream>,
71    }
72}
73
74impl TcpStream {
75    cfg_not_wasi! {
76        /// Opens a TCP connection to a remote host.
77        ///
78        /// `addr` is an address of the remote host. Anything which implements the
79        /// [`ToSocketAddrs`] trait can be supplied as the address.  If `addr`
80        /// yields multiple addresses, connect will be attempted with each of the
81        /// addresses until a connection is successful. If none of the addresses
82        /// result in a successful connection, the error returned from the last
83        /// connection attempt (the last address) is returned.
84        ///
85        /// To configure the socket before connecting, you can use the [`TcpSocket`]
86        /// type.
87        ///
88        /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
89        /// [`TcpSocket`]: struct@crate::net::TcpSocket
90        ///
91        /// # Examples
92        ///
93        /// ```no_run
94        /// use tokio::net::TcpStream;
95        /// use tokio::io::AsyncWriteExt;
96        /// use std::error::Error;
97        ///
98        /// #[tokio::main]
99        /// async fn main() -> Result<(), Box<dyn Error>> {
100        ///     // Connect to a peer
101        ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
102        ///
103        ///     // Write some data.
104        ///     stream.write_all(b"hello world!").await?;
105        ///
106        ///     Ok(())
107        /// }
108        /// ```
109        ///
110        /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
111        ///
112        /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
113        /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
114        pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
115            let addrs = to_socket_addrs(addr).await?;
116
117            let mut last_err = None;
118
119            for addr in addrs {
120                match TcpStream::connect_addr(addr).await {
121                    Ok(stream) => return Ok(stream),
122                    Err(e) => last_err = Some(e),
123                }
124            }
125
126            Err(last_err.unwrap_or_else(|| {
127                io::Error::new(
128                    io::ErrorKind::InvalidInput,
129                    "could not resolve to any address",
130                )
131            }))
132        }
133
134        /// Establishes a connection to the specified `addr`.
135        async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
136            let sys = mio::net::TcpStream::connect(addr)?;
137            TcpStream::connect_mio(sys).await
138        }
139
140        pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
141            let stream = TcpStream::new(sys)?;
142
143            // Once we've connected, wait for the stream to be writable as
144            // that's when the actual connection has been initiated. Once we're
145            // writable we check for `take_socket_error` to see if the connect
146            // actually hit an error or not.
147            //
148            // If all that succeeded then we ship everything on up.
149            poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
150
151            if let Some(e) = stream.io.take_error()? {
152                return Err(e);
153            }
154
155            Ok(stream)
156        }
157    }
158
159    pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
160        let io = PollEvented::new(connected)?;
161        Ok(TcpStream { io })
162    }
163
164    /// Creates new `TcpStream` from a `std::net::TcpStream`.
165    ///
166    /// This function is intended to be used to wrap a TCP stream from the
167    /// standard library in the Tokio equivalent.
168    ///
169    /// # Notes
170    ///
171    /// The caller is responsible for ensuring that the stream is in
172    /// non-blocking mode. Otherwise all I/O operations on the stream
173    /// will block the thread, which will cause unexpected behavior.
174    /// Non-blocking mode can be set using [`set_nonblocking`].
175    ///
176    /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
177    ///
178    /// # Examples
179    ///
180    /// ```rust,no_run
181    /// use std::error::Error;
182    /// use tokio::net::TcpStream;
183    ///
184    /// #[tokio::main]
185    /// async fn main() -> Result<(), Box<dyn Error>> {
186    ///     let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
187    ///     std_stream.set_nonblocking(true)?;
188    ///     let stream = TcpStream::from_std(std_stream)?;
189    ///     Ok(())
190    /// }
191    /// ```
192    ///
193    /// # Panics
194    ///
195    /// This function panics if it is not called from within a runtime with
196    /// IO enabled.
197    ///
198    /// The runtime is usually set implicitly when this function is called
199    /// from a future driven by a tokio runtime, otherwise runtime can be set
200    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
201    #[track_caller]
202    pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
203        let io = mio::net::TcpStream::from_std(stream);
204        let io = PollEvented::new(io)?;
205        Ok(TcpStream { io })
206    }
207
208    /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
209    ///
210    /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
211    /// Use [`set_nonblocking`] to change the blocking mode if needed.
212    ///
213    /// # Examples
214    ///
215    /// ```
216    /// use std::error::Error;
217    /// use std::io::Read;
218    /// use tokio::net::TcpListener;
219    /// # use tokio::net::TcpStream;
220    /// # use tokio::io::AsyncWriteExt;
221    ///
222    /// #[tokio::main]
223    /// async fn main() -> Result<(), Box<dyn Error>> {
224    ///     let mut data = [0u8; 12];
225    /// #   if false {
226    ///     let listener = TcpListener::bind("127.0.0.1:34254").await?;
227    /// #   }
228    /// #   let listener = TcpListener::bind("127.0.0.1:0").await?;
229    /// #   let addr = listener.local_addr().unwrap();
230    /// #   let handle = tokio::spawn(async move {
231    /// #       let mut stream: TcpStream = TcpStream::connect(addr).await.unwrap();
232    /// #       stream.write_all(b"Hello world!").await.unwrap();
233    /// #   });
234    ///     let (tokio_tcp_stream, _) = listener.accept().await?;
235    ///     let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
236    /// #   handle.await.expect("The task being joined has panicked");
237    ///     std_tcp_stream.set_nonblocking(false)?;
238    ///     std_tcp_stream.read_exact(&mut data)?;
239    /// #   assert_eq!(b"Hello world!", &data);
240    ///     Ok(())
241    /// }
242    /// ```
243    /// [`tokio::net::TcpStream`]: TcpStream
244    /// [`std::net::TcpStream`]: std::net::TcpStream
245    /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
246    pub fn into_std(self) -> io::Result<std::net::TcpStream> {
247        #[cfg(unix)]
248        {
249            use std::os::unix::io::{FromRawFd, IntoRawFd};
250            self.io
251                .into_inner()
252                .map(IntoRawFd::into_raw_fd)
253                .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
254        }
255
256        #[cfg(windows)]
257        {
258            use std::os::windows::io::{FromRawSocket, IntoRawSocket};
259            self.io
260                .into_inner()
261                .map(|io| io.into_raw_socket())
262                .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
263        }
264
265        #[cfg(target_os = "wasi")]
266        {
267            use std::os::wasi::io::{FromRawFd, IntoRawFd};
268            self.io
269                .into_inner()
270                .map(|io| io.into_raw_fd())
271                .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
272        }
273    }
274
275    /// Returns the local address that this stream is bound to.
276    ///
277    /// # Examples
278    ///
279    /// ```no_run
280    /// use tokio::net::TcpStream;
281    ///
282    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
283    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
284    ///
285    /// println!("{:?}", stream.local_addr()?);
286    /// # Ok(())
287    /// # }
288    /// ```
289    pub fn local_addr(&self) -> io::Result<SocketAddr> {
290        self.io.local_addr()
291    }
292
293    /// Returns the value of the `SO_ERROR` option.
294    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
295        self.io.take_error()
296    }
297
298    /// Returns the remote address that this stream is connected to.
299    ///
300    /// # Examples
301    ///
302    /// ```no_run
303    /// use tokio::net::TcpStream;
304    ///
305    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
306    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
307    ///
308    /// println!("{:?}", stream.peer_addr()?);
309    /// # Ok(())
310    /// # }
311    /// ```
312    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
313        self.io.peer_addr()
314    }
315
316    /// Attempts to receive data on the socket, without removing that data from
317    /// the queue, registering the current task for wakeup if data is not yet
318    /// available.
319    ///
320    /// Note that on multiple calls to `poll_peek`, `poll_read` or
321    /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
322    /// most recent call is scheduled to receive a wakeup. (However,
323    /// `poll_write` retains a second, independent waker.)
324    ///
325    /// # Return value
326    ///
327    /// The function returns:
328    ///
329    /// * `Poll::Pending` if data is not yet available.
330    /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
331    /// * `Poll::Ready(Err(e))` if an error is encountered.
332    ///
333    /// # Errors
334    ///
335    /// This function may encounter any standard I/O error except `WouldBlock`.
336    ///
337    /// # Examples
338    ///
339    /// ```no_run
340    /// use tokio::io::{self, ReadBuf};
341    /// use tokio::net::TcpStream;
342    ///
343    /// use futures::future::poll_fn;
344    ///
345    /// #[tokio::main]
346    /// async fn main() -> io::Result<()> {
347    ///     let stream = TcpStream::connect("127.0.0.1:8000").await?;
348    ///     let mut buf = [0; 10];
349    ///     let mut buf = ReadBuf::new(&mut buf);
350    ///
351    ///     poll_fn(|cx| {
352    ///         stream.poll_peek(cx, &mut buf)
353    ///     }).await?;
354    ///
355    ///     Ok(())
356    /// }
357    /// ```
358    pub fn poll_peek(
359        &self,
360        cx: &mut Context<'_>,
361        buf: &mut ReadBuf<'_>,
362    ) -> Poll<io::Result<usize>> {
363        loop {
364            let ev = ready!(self.io.registration().poll_read_ready(cx))?;
365
366            let b = unsafe {
367                &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
368            };
369
370            match self.io.peek(b) {
371                Ok(ret) => {
372                    unsafe { buf.assume_init(ret) };
373                    buf.advance(ret);
374                    return Poll::Ready(Ok(ret));
375                }
376                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
377                    self.io.registration().clear_readiness(ev);
378                }
379                Err(e) => return Poll::Ready(Err(e)),
380            }
381        }
382    }
383
384    /// Waits for any of the requested ready states.
385    ///
386    /// This function is usually paired with `try_read()` or `try_write()`. It
387    /// can be used to concurrently read / write to the same socket on a single
388    /// task without splitting the socket.
389    ///
390    /// The function may complete without the socket being ready. This is a
391    /// false-positive and attempting an operation will return with
392    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
393    /// [`Ready`] set, so you should always check the returned value and possibly
394    /// wait again if the requested states are not set.
395    ///
396    /// # Cancel safety
397    ///
398    /// This method is cancel safe. Once a readiness event occurs, the method
399    /// will continue to return immediately until the readiness event is
400    /// consumed by an attempt to read or write that fails with `WouldBlock` or
401    /// `Poll::Pending`.
402    ///
403    /// # Examples
404    ///
405    /// Concurrently read and write to the stream on the same task without
406    /// splitting.
407    ///
408    /// ```no_run
409    /// use tokio::io::Interest;
410    /// use tokio::net::TcpStream;
411    /// use std::error::Error;
412    /// use std::io;
413    ///
414    /// #[tokio::main]
415    /// async fn main() -> Result<(), Box<dyn Error>> {
416    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
417    ///
418    ///     loop {
419    ///         let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
420    ///
421    ///         if ready.is_readable() {
422    ///             let mut data = vec![0; 1024];
423    ///             // Try to read data, this may still fail with `WouldBlock`
424    ///             // if the readiness event is a false positive.
425    ///             match stream.try_read(&mut data) {
426    ///                 Ok(n) => {
427    ///                     println!("read {} bytes", n);
428    ///                 }
429    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
430    ///                     continue;
431    ///                 }
432    ///                 Err(e) => {
433    ///                     return Err(e.into());
434    ///                 }
435    ///             }
436    ///
437    ///         }
438    ///
439    ///         if ready.is_writable() {
440    ///             // Try to write data, this may still fail with `WouldBlock`
441    ///             // if the readiness event is a false positive.
442    ///             match stream.try_write(b"hello world") {
443    ///                 Ok(n) => {
444    ///                     println!("write {} bytes", n);
445    ///                 }
446    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
447    ///                     continue
448    ///                 }
449    ///                 Err(e) => {
450    ///                     return Err(e.into());
451    ///                 }
452    ///             }
453    ///         }
454    ///     }
455    /// }
456    /// ```
457    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
458        let event = self.io.registration().readiness(interest).await?;
459        Ok(event.ready)
460    }
461
462    /// Waits for the socket to become readable.
463    ///
464    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
465    /// paired with `try_read()`.
466    ///
467    /// # Cancel safety
468    ///
469    /// This method is cancel safe. Once a readiness event occurs, the method
470    /// will continue to return immediately until the readiness event is
471    /// consumed by an attempt to read that fails with `WouldBlock` or
472    /// `Poll::Pending`.
473    ///
474    /// # Examples
475    ///
476    /// ```no_run
477    /// use tokio::net::TcpStream;
478    /// use std::error::Error;
479    /// use std::io;
480    ///
481    /// #[tokio::main]
482    /// async fn main() -> Result<(), Box<dyn Error>> {
483    ///     // Connect to a peer
484    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
485    ///
486    ///     let mut msg = vec![0; 1024];
487    ///
488    ///     loop {
489    ///         // Wait for the socket to be readable
490    ///         stream.readable().await?;
491    ///
492    ///         // Try to read data, this may still fail with `WouldBlock`
493    ///         // if the readiness event is a false positive.
494    ///         match stream.try_read(&mut msg) {
495    ///             Ok(n) => {
496    ///                 msg.truncate(n);
497    ///                 break;
498    ///             }
499    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
500    ///                 continue;
501    ///             }
502    ///             Err(e) => {
503    ///                 return Err(e.into());
504    ///             }
505    ///         }
506    ///     }
507    ///
508    ///     println!("GOT = {:?}", msg);
509    ///     Ok(())
510    /// }
511    /// ```
512    pub async fn readable(&self) -> io::Result<()> {
513        self.ready(Interest::READABLE).await?;
514        Ok(())
515    }
516
517    /// Polls for read readiness.
518    ///
519    /// If the tcp stream is not currently ready for reading, this method will
520    /// store a clone of the `Waker` from the provided `Context`. When the tcp
521    /// stream becomes ready for reading, `Waker::wake` will be called on the
522    /// waker.
523    ///
524    /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
525    /// `poll_peek`, only the `Waker` from the `Context` passed to the most
526    /// recent call is scheduled to receive a wakeup. (However,
527    /// `poll_write_ready` retains a second, independent waker.)
528    ///
529    /// This function is intended for cases where creating and pinning a future
530    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
531    /// preferred, as this supports polling from multiple tasks at once.
532    ///
533    /// # Return value
534    ///
535    /// The function returns:
536    ///
537    /// * `Poll::Pending` if the tcp stream is not ready for reading.
538    /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
539    /// * `Poll::Ready(Err(e))` if an error is encountered.
540    ///
541    /// # Errors
542    ///
543    /// This function may encounter any standard I/O error except `WouldBlock`.
544    ///
545    /// [`readable`]: method@Self::readable
546    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
547        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
548    }
549
550    /// Tries to read data from the stream into the provided buffer, returning how
551    /// many bytes were read.
552    ///
553    /// Receives any pending data from the socket but does not wait for new data
554    /// to arrive. On success, returns the number of bytes read. Because
555    /// `try_read()` is non-blocking, the buffer does not have to be stored by
556    /// the async task and can exist entirely on the stack.
557    ///
558    /// Usually, [`readable()`] or [`ready()`] is used with this function.
559    ///
560    /// [`readable()`]: TcpStream::readable()
561    /// [`ready()`]: TcpStream::ready()
562    ///
563    /// # Return
564    ///
565    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
566    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
567    ///
568    /// 1. The stream's read half is closed and will no longer yield data.
569    /// 2. The specified buffer was 0 bytes in length.
570    ///
571    /// If the stream is not ready to read data,
572    /// `Err(io::ErrorKind::WouldBlock)` is returned.
573    ///
574    /// # Examples
575    ///
576    /// ```no_run
577    /// use tokio::net::TcpStream;
578    /// use std::error::Error;
579    /// use std::io;
580    ///
581    /// #[tokio::main]
582    /// async fn main() -> Result<(), Box<dyn Error>> {
583    ///     // Connect to a peer
584    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
585    ///
586    ///     loop {
587    ///         // Wait for the socket to be readable
588    ///         stream.readable().await?;
589    ///
590    ///         // Creating the buffer **after** the `await` prevents it from
591    ///         // being stored in the async task.
592    ///         let mut buf = [0; 4096];
593    ///
594    ///         // Try to read data, this may still fail with `WouldBlock`
595    ///         // if the readiness event is a false positive.
596    ///         match stream.try_read(&mut buf) {
597    ///             Ok(0) => break,
598    ///             Ok(n) => {
599    ///                 println!("read {} bytes", n);
600    ///             }
601    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
602    ///                 continue;
603    ///             }
604    ///             Err(e) => {
605    ///                 return Err(e.into());
606    ///             }
607    ///         }
608    ///     }
609    ///
610    ///     Ok(())
611    /// }
612    /// ```
613    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
614        use std::io::Read;
615
616        self.io
617            .registration()
618            .try_io(Interest::READABLE, || (&*self.io).read(buf))
619    }
620
621    /// Tries to read data from the stream into the provided buffers, returning
622    /// how many bytes were read.
623    ///
624    /// Data is copied to fill each buffer in order, with the final buffer
625    /// written to possibly being only partially filled. This method behaves
626    /// equivalently to a single call to [`try_read()`] with concatenated
627    /// buffers.
628    ///
629    /// Receives any pending data from the socket but does not wait for new data
630    /// to arrive. On success, returns the number of bytes read. Because
631    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
632    /// stored by the async task and can exist entirely on the stack.
633    ///
634    /// Usually, [`readable()`] or [`ready()`] is used with this function.
635    ///
636    /// [`try_read()`]: TcpStream::try_read()
637    /// [`readable()`]: TcpStream::readable()
638    /// [`ready()`]: TcpStream::ready()
639    ///
640    /// # Return
641    ///
642    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
643    /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
644    /// and will no longer yield data. If the stream is not ready to read data
645    /// `Err(io::ErrorKind::WouldBlock)` is returned.
646    ///
647    /// # Examples
648    ///
649    /// ```no_run
650    /// use tokio::net::TcpStream;
651    /// use std::error::Error;
652    /// use std::io::{self, IoSliceMut};
653    ///
654    /// #[tokio::main]
655    /// async fn main() -> Result<(), Box<dyn Error>> {
656    ///     // Connect to a peer
657    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
658    ///
659    ///     loop {
660    ///         // Wait for the socket to be readable
661    ///         stream.readable().await?;
662    ///
663    ///         // Creating the buffer **after** the `await` prevents it from
664    ///         // being stored in the async task.
665    ///         let mut buf_a = [0; 512];
666    ///         let mut buf_b = [0; 1024];
667    ///         let mut bufs = [
668    ///             IoSliceMut::new(&mut buf_a),
669    ///             IoSliceMut::new(&mut buf_b),
670    ///         ];
671    ///
672    ///         // Try to read data, this may still fail with `WouldBlock`
673    ///         // if the readiness event is a false positive.
674    ///         match stream.try_read_vectored(&mut bufs) {
675    ///             Ok(0) => break,
676    ///             Ok(n) => {
677    ///                 println!("read {} bytes", n);
678    ///             }
679    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
680    ///                 continue;
681    ///             }
682    ///             Err(e) => {
683    ///                 return Err(e.into());
684    ///             }
685    ///         }
686    ///     }
687    ///
688    ///     Ok(())
689    /// }
690    /// ```
691    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
692        use std::io::Read;
693
694        self.io
695            .registration()
696            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
697    }
698
699    cfg_io_util! {
700        /// Tries to read data from the stream into the provided buffer, advancing the
701        /// buffer's internal cursor, returning how many bytes were read.
702        ///
703        /// Receives any pending data from the socket but does not wait for new data
704        /// to arrive. On success, returns the number of bytes read. Because
705        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
706        /// the async task and can exist entirely on the stack.
707        ///
708        /// Usually, [`readable()`] or [`ready()`] is used with this function.
709        ///
710        /// [`readable()`]: TcpStream::readable()
711        /// [`ready()`]: TcpStream::ready()
712        ///
713        /// # Return
714        ///
715        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
716        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
717        /// and will no longer yield data. If the stream is not ready to read data
718        /// `Err(io::ErrorKind::WouldBlock)` is returned.
719        ///
720        /// # Examples
721        ///
722        /// ```no_run
723        /// use tokio::net::TcpStream;
724        /// use std::error::Error;
725        /// use std::io;
726        ///
727        /// #[tokio::main]
728        /// async fn main() -> Result<(), Box<dyn Error>> {
729        ///     // Connect to a peer
730        ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
731        ///
732        ///     loop {
733        ///         // Wait for the socket to be readable
734        ///         stream.readable().await?;
735        ///
736        ///         let mut buf = Vec::with_capacity(4096);
737        ///
738        ///         // Try to read data, this may still fail with `WouldBlock`
739        ///         // if the readiness event is a false positive.
740        ///         match stream.try_read_buf(&mut buf) {
741        ///             Ok(0) => break,
742        ///             Ok(n) => {
743        ///                 println!("read {} bytes", n);
744        ///             }
745        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
746        ///                 continue;
747        ///             }
748        ///             Err(e) => {
749        ///                 return Err(e.into());
750        ///             }
751        ///         }
752        ///     }
753        ///
754        ///     Ok(())
755        /// }
756        /// ```
757        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
758            self.io.registration().try_io(Interest::READABLE, || {
759                use std::io::Read;
760
761                let dst = buf.chunk_mut();
762                let dst =
763                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
764
765                // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
766                // buffer.
767                let n = (&*self.io).read(dst)?;
768
769                unsafe {
770                    buf.advance_mut(n);
771                }
772
773                Ok(n)
774            })
775        }
776    }
777
778    /// Waits for the socket to become writable.
779    ///
780    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
781    /// paired with `try_write()`.
782    ///
783    /// # Cancel safety
784    ///
785    /// This method is cancel safe. Once a readiness event occurs, the method
786    /// will continue to return immediately until the readiness event is
787    /// consumed by an attempt to write that fails with `WouldBlock` or
788    /// `Poll::Pending`.
789    ///
790    /// # Examples
791    ///
792    /// ```no_run
793    /// use tokio::net::TcpStream;
794    /// use std::error::Error;
795    /// use std::io;
796    ///
797    /// #[tokio::main]
798    /// async fn main() -> Result<(), Box<dyn Error>> {
799    ///     // Connect to a peer
800    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
801    ///
802    ///     loop {
803    ///         // Wait for the socket to be writable
804    ///         stream.writable().await?;
805    ///
806    ///         // Try to write data, this may still fail with `WouldBlock`
807    ///         // if the readiness event is a false positive.
808    ///         match stream.try_write(b"hello world") {
809    ///             Ok(n) => {
810    ///                 break;
811    ///             }
812    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
813    ///                 continue;
814    ///             }
815    ///             Err(e) => {
816    ///                 return Err(e.into());
817    ///             }
818    ///         }
819    ///     }
820    ///
821    ///     Ok(())
822    /// }
823    /// ```
824    pub async fn writable(&self) -> io::Result<()> {
825        self.ready(Interest::WRITABLE).await?;
826        Ok(())
827    }
828
829    /// Polls for write readiness.
830    ///
831    /// If the tcp stream is not currently ready for writing, this method will
832    /// store a clone of the `Waker` from the provided `Context`. When the tcp
833    /// stream becomes ready for writing, `Waker::wake` will be called on the
834    /// waker.
835    ///
836    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
837    /// the `Waker` from the `Context` passed to the most recent call is
838    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
839    /// second, independent waker.)
840    ///
841    /// This function is intended for cases where creating and pinning a future
842    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
843    /// preferred, as this supports polling from multiple tasks at once.
844    ///
845    /// # Return value
846    ///
847    /// The function returns:
848    ///
849    /// * `Poll::Pending` if the tcp stream is not ready for writing.
850    /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
851    /// * `Poll::Ready(Err(e))` if an error is encountered.
852    ///
853    /// # Errors
854    ///
855    /// This function may encounter any standard I/O error except `WouldBlock`.
856    ///
857    /// [`writable`]: method@Self::writable
858    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
859        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
860    }
861
862    /// Try to write a buffer to the stream, returning how many bytes were
863    /// written.
864    ///
865    /// The function will attempt to write the entire contents of `buf`, but
866    /// only part of the buffer may be written.
867    ///
868    /// This function is usually paired with `writable()`.
869    ///
870    /// # Return
871    ///
872    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
873    /// number of bytes written. If the stream is not ready to write data,
874    /// `Err(io::ErrorKind::WouldBlock)` is returned.
875    ///
876    /// # Examples
877    ///
878    /// ```no_run
879    /// use tokio::net::TcpStream;
880    /// use std::error::Error;
881    /// use std::io;
882    ///
883    /// #[tokio::main]
884    /// async fn main() -> Result<(), Box<dyn Error>> {
885    ///     // Connect to a peer
886    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
887    ///
888    ///     loop {
889    ///         // Wait for the socket to be writable
890    ///         stream.writable().await?;
891    ///
892    ///         // Try to write data, this may still fail with `WouldBlock`
893    ///         // if the readiness event is a false positive.
894    ///         match stream.try_write(b"hello world") {
895    ///             Ok(n) => {
896    ///                 break;
897    ///             }
898    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
899    ///                 continue;
900    ///             }
901    ///             Err(e) => {
902    ///                 return Err(e.into());
903    ///             }
904    ///         }
905    ///     }
906    ///
907    ///     Ok(())
908    /// }
909    /// ```
910    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
911        use std::io::Write;
912
913        self.io
914            .registration()
915            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
916    }
917
918    /// Tries to write several buffers to the stream, returning how many bytes
919    /// were written.
920    ///
921    /// Data is written from each buffer in order, with the final buffer read
922    /// from possible being only partially consumed. This method behaves
923    /// equivalently to a single call to [`try_write()`] with concatenated
924    /// buffers.
925    ///
926    /// This function is usually paired with `writable()`.
927    ///
928    /// [`try_write()`]: TcpStream::try_write()
929    ///
930    /// # Return
931    ///
932    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
933    /// number of bytes written. If the stream is not ready to write data,
934    /// `Err(io::ErrorKind::WouldBlock)` is returned.
935    ///
936    /// # Examples
937    ///
938    /// ```no_run
939    /// use tokio::net::TcpStream;
940    /// use std::error::Error;
941    /// use std::io;
942    ///
943    /// #[tokio::main]
944    /// async fn main() -> Result<(), Box<dyn Error>> {
945    ///     // Connect to a peer
946    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
947    ///
948    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
949    ///
950    ///     loop {
951    ///         // Wait for the socket to be writable
952    ///         stream.writable().await?;
953    ///
954    ///         // Try to write data, this may still fail with `WouldBlock`
955    ///         // if the readiness event is a false positive.
956    ///         match stream.try_write_vectored(&bufs) {
957    ///             Ok(n) => {
958    ///                 break;
959    ///             }
960    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
961    ///                 continue;
962    ///             }
963    ///             Err(e) => {
964    ///                 return Err(e.into());
965    ///             }
966    ///         }
967    ///     }
968    ///
969    ///     Ok(())
970    /// }
971    /// ```
972    pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
973        use std::io::Write;
974
975        self.io
976            .registration()
977            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
978    }
979
980    /// Tries to read or write from the socket using a user-provided IO operation.
981    ///
982    /// If the socket is ready, the provided closure is called. The closure
983    /// should attempt to perform IO operation on the socket by manually
984    /// calling the appropriate syscall. If the operation fails because the
985    /// socket is not actually ready, then the closure should return a
986    /// `WouldBlock` error and the readiness flag is cleared. The return value
987    /// of the closure is then returned by `try_io`.
988    ///
989    /// If the socket is not ready, then the closure is not called
990    /// and a `WouldBlock` error is returned.
991    ///
992    /// The closure should only return a `WouldBlock` error if it has performed
993    /// an IO operation on the socket that failed due to the socket not being
994    /// ready. Returning a `WouldBlock` error in any other situation will
995    /// incorrectly clear the readiness flag, which can cause the socket to
996    /// behave incorrectly.
997    ///
998    /// The closure should not perform the IO operation using any of the methods
999    /// defined on the Tokio `TcpStream` type, as this will mess with the
1000    /// readiness flag and can cause the socket to behave incorrectly.
1001    ///
1002    /// This method is not intended to be used with combined interests.
1003    /// The closure should perform only one type of IO operation, so it should not
1004    /// require more than one ready state. This method may panic or sleep forever
1005    /// if it is called with a combined interest.
1006    ///
1007    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1008    ///
1009    /// [`readable()`]: TcpStream::readable()
1010    /// [`writable()`]: TcpStream::writable()
1011    /// [`ready()`]: TcpStream::ready()
1012    pub fn try_io<R>(
1013        &self,
1014        interest: Interest,
1015        f: impl FnOnce() -> io::Result<R>,
1016    ) -> io::Result<R> {
1017        self.io
1018            .registration()
1019            .try_io(interest, || self.io.try_io(f))
1020    }
1021
1022    /// Reads or writes from the socket using a user-provided IO operation.
1023    ///
1024    /// The readiness of the socket is awaited and when the socket is ready,
1025    /// the provided closure is called. The closure should attempt to perform
1026    /// IO operation on the socket by manually calling the appropriate syscall.
1027    /// If the operation fails because the socket is not actually ready,
1028    /// then the closure should return a `WouldBlock` error. In such case the
1029    /// readiness flag is cleared and the socket readiness is awaited again.
1030    /// This loop is repeated until the closure returns an `Ok` or an error
1031    /// other than `WouldBlock`.
1032    ///
1033    /// The closure should only return a `WouldBlock` error if it has performed
1034    /// an IO operation on the socket that failed due to the socket not being
1035    /// ready. Returning a `WouldBlock` error in any other situation will
1036    /// incorrectly clear the readiness flag, which can cause the socket to
1037    /// behave incorrectly.
1038    ///
1039    /// The closure should not perform the IO operation using any of the methods
1040    /// defined on the Tokio `TcpStream` type, as this will mess with the
1041    /// readiness flag and can cause the socket to behave incorrectly.
1042    ///
1043    /// This method is not intended to be used with combined interests.
1044    /// The closure should perform only one type of IO operation, so it should not
1045    /// require more than one ready state. This method may panic or sleep forever
1046    /// if it is called with a combined interest.
1047    pub async fn async_io<R>(
1048        &self,
1049        interest: Interest,
1050        mut f: impl FnMut() -> io::Result<R>,
1051    ) -> io::Result<R> {
1052        self.io
1053            .registration()
1054            .async_io(interest, || self.io.try_io(&mut f))
1055            .await
1056    }
1057
1058    /// Receives data on the socket from the remote address to which it is
1059    /// connected, without removing that data from the queue. On success,
1060    /// returns the number of bytes peeked.
1061    ///
1062    /// Successive calls return the same data. This is accomplished by passing
1063    /// `MSG_PEEK` as a flag to the underlying `recv` system call.
1064    ///
1065    /// # Examples
1066    ///
1067    /// ```no_run
1068    /// use tokio::net::TcpStream;
1069    /// use tokio::io::AsyncReadExt;
1070    /// use std::error::Error;
1071    ///
1072    /// #[tokio::main]
1073    /// async fn main() -> Result<(), Box<dyn Error>> {
1074    ///     // Connect to a peer
1075    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
1076    ///
1077    ///     let mut b1 = [0; 10];
1078    ///     let mut b2 = [0; 10];
1079    ///
1080    ///     // Peek at the data
1081    ///     let n = stream.peek(&mut b1).await?;
1082    ///
1083    ///     // Read the data
1084    ///     assert_eq!(n, stream.read(&mut b2[..n]).await?);
1085    ///     assert_eq!(&b1[..n], &b2[..n]);
1086    ///
1087    ///     Ok(())
1088    /// }
1089    /// ```
1090    ///
1091    /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1092    ///
1093    /// [`read`]: fn@crate::io::AsyncReadExt::read
1094    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
1095    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1096        self.io
1097            .registration()
1098            .async_io(Interest::READABLE, || self.io.peek(buf))
1099            .await
1100    }
1101
1102    /// Shuts down the read, write, or both halves of this connection.
1103    ///
1104    /// This function will cause all pending and future I/O on the specified
1105    /// portions to return immediately with an appropriate value (see the
1106    /// documentation of `Shutdown`).
1107    pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1108        self.io.shutdown(how)
1109    }
1110
1111    /// Gets the value of the `TCP_NODELAY` option on this socket.
1112    ///
1113    /// For more information about this option, see [`set_nodelay`].
1114    ///
1115    /// [`set_nodelay`]: TcpStream::set_nodelay
1116    ///
1117    /// # Examples
1118    ///
1119    /// ```no_run
1120    /// use tokio::net::TcpStream;
1121    ///
1122    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1123    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1124    ///
1125    /// println!("{:?}", stream.nodelay()?);
1126    /// # Ok(())
1127    /// # }
1128    /// ```
1129    pub fn nodelay(&self) -> io::Result<bool> {
1130        self.io.nodelay()
1131    }
1132
1133    /// Sets the value of the `TCP_NODELAY` option on this socket.
1134    ///
1135    /// If set, this option disables the Nagle algorithm. This means that
1136    /// segments are always sent as soon as possible, even if there is only a
1137    /// small amount of data. When not set, data is buffered until there is a
1138    /// sufficient amount to send out, thereby avoiding the frequent sending of
1139    /// small packets.
1140    ///
1141    /// # Examples
1142    ///
1143    /// ```no_run
1144    /// use tokio::net::TcpStream;
1145    ///
1146    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1147    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1148    ///
1149    /// stream.set_nodelay(true)?;
1150    /// # Ok(())
1151    /// # }
1152    /// ```
1153    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1154        self.io.set_nodelay(nodelay)
1155    }
1156
1157    cfg_not_wasi! {
1158        /// Reads the linger duration for this socket by getting the `SO_LINGER`
1159        /// option.
1160        ///
1161        /// For more information about this option, see [`set_linger`].
1162        ///
1163        /// [`set_linger`]: TcpStream::set_linger
1164        ///
1165        /// # Examples
1166        ///
1167        /// ```no_run
1168        /// use tokio::net::TcpStream;
1169        ///
1170        /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1171        /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1172        ///
1173        /// println!("{:?}", stream.linger()?);
1174        /// # Ok(())
1175        /// # }
1176        /// ```
1177        pub fn linger(&self) -> io::Result<Option<Duration>> {
1178            socket2::SockRef::from(self).linger()
1179        }
1180
1181        /// Sets the linger duration of this socket by setting the `SO_LINGER` option.
1182        ///
1183        /// This option controls the action taken when a stream has unsent messages and the stream is
1184        /// closed. If `SO_LINGER` is set, the system shall block the process until it can transmit the
1185        /// data or until the time expires.
1186        ///
1187        /// If `SO_LINGER` is not specified, and the stream is closed, the system handles the call in a
1188        /// way that allows the process to continue as quickly as possible.
1189        ///
1190        /// # Examples
1191        ///
1192        /// ```no_run
1193        /// use tokio::net::TcpStream;
1194        ///
1195        /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1196        /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1197        ///
1198        /// stream.set_linger(None)?;
1199        /// # Ok(())
1200        /// # }
1201        /// ```
1202        pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1203            socket2::SockRef::from(self).set_linger(dur)
1204        }
1205    }
1206
1207    /// Gets the value of the `IP_TTL` option for this socket.
1208    ///
1209    /// For more information about this option, see [`set_ttl`].
1210    ///
1211    /// [`set_ttl`]: TcpStream::set_ttl
1212    ///
1213    /// # Examples
1214    ///
1215    /// ```no_run
1216    /// use tokio::net::TcpStream;
1217    ///
1218    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1219    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1220    ///
1221    /// println!("{:?}", stream.ttl()?);
1222    /// # Ok(())
1223    /// # }
1224    /// ```
1225    pub fn ttl(&self) -> io::Result<u32> {
1226        self.io.ttl()
1227    }
1228
1229    /// Sets the value for the `IP_TTL` option on this socket.
1230    ///
1231    /// This value sets the time-to-live field that is used in every packet sent
1232    /// from this socket.
1233    ///
1234    /// # Examples
1235    ///
1236    /// ```no_run
1237    /// use tokio::net::TcpStream;
1238    ///
1239    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1240    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1241    ///
1242    /// stream.set_ttl(123)?;
1243    /// # Ok(())
1244    /// # }
1245    /// ```
1246    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1247        self.io.set_ttl(ttl)
1248    }
1249
1250    // These lifetime markers also appear in the generated documentation, and make
1251    // it more clear that this is a *borrowed* split.
1252    #[allow(clippy::needless_lifetimes)]
1253    /// Splits a `TcpStream` into a read half and a write half, which can be used
1254    /// to read and write the stream concurrently.
1255    ///
1256    /// This method is more efficient than [`into_split`], but the halves cannot be
1257    /// moved into independently spawned tasks.
1258    ///
1259    /// [`into_split`]: TcpStream::into_split()
1260    pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1261        split(self)
1262    }
1263
1264    /// Splits a `TcpStream` into a read half and a write half, which can be used
1265    /// to read and write the stream concurrently.
1266    ///
1267    /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1268    /// this comes at the cost of a heap allocation.
1269    ///
1270    /// **Note:** Dropping the write half will shut down the write half of the TCP
1271    /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1272    ///
1273    /// [`split`]: TcpStream::split()
1274    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
1275    pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1276        split_owned(self)
1277    }
1278
1279    // == Poll IO functions that takes `&self` ==
1280    //
1281    // To read or write without mutable access to the `UnixStream`, combine the
1282    // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1283    // `try_write` methods.
1284
1285    pub(crate) fn poll_read_priv(
1286        &self,
1287        cx: &mut Context<'_>,
1288        buf: &mut ReadBuf<'_>,
1289    ) -> Poll<io::Result<()>> {
1290        // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1291        unsafe { self.io.poll_read(cx, buf) }
1292    }
1293
1294    pub(super) fn poll_write_priv(
1295        &self,
1296        cx: &mut Context<'_>,
1297        buf: &[u8],
1298    ) -> Poll<io::Result<usize>> {
1299        self.io.poll_write(cx, buf)
1300    }
1301
1302    pub(super) fn poll_write_vectored_priv(
1303        &self,
1304        cx: &mut Context<'_>,
1305        bufs: &[io::IoSlice<'_>],
1306    ) -> Poll<io::Result<usize>> {
1307        self.io.poll_write_vectored(cx, bufs)
1308    }
1309}
1310
1311impl TryFrom<std::net::TcpStream> for TcpStream {
1312    type Error = io::Error;
1313
1314    /// Consumes stream, returning the tokio I/O object.
1315    ///
1316    /// This is equivalent to
1317    /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
1318    fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1319        Self::from_std(stream)
1320    }
1321}
1322
1323// ===== impl Read / Write =====
1324
1325impl AsyncRead for TcpStream {
1326    fn poll_read(
1327        self: Pin<&mut Self>,
1328        cx: &mut Context<'_>,
1329        buf: &mut ReadBuf<'_>,
1330    ) -> Poll<io::Result<()>> {
1331        self.poll_read_priv(cx, buf)
1332    }
1333}
1334
1335impl AsyncWrite for TcpStream {
1336    fn poll_write(
1337        self: Pin<&mut Self>,
1338        cx: &mut Context<'_>,
1339        buf: &[u8],
1340    ) -> Poll<io::Result<usize>> {
1341        self.poll_write_priv(cx, buf)
1342    }
1343
1344    fn poll_write_vectored(
1345        self: Pin<&mut Self>,
1346        cx: &mut Context<'_>,
1347        bufs: &[io::IoSlice<'_>],
1348    ) -> Poll<io::Result<usize>> {
1349        self.poll_write_vectored_priv(cx, bufs)
1350    }
1351
1352    fn is_write_vectored(&self) -> bool {
1353        true
1354    }
1355
1356    #[inline]
1357    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1358        // tcp flush is a no-op
1359        Poll::Ready(Ok(()))
1360    }
1361
1362    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1363        self.shutdown_std(std::net::Shutdown::Write)?;
1364        Poll::Ready(Ok(()))
1365    }
1366}
1367
1368impl fmt::Debug for TcpStream {
1369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1370        self.io.fmt(f)
1371    }
1372}
1373
1374#[cfg(unix)]
1375mod sys {
1376    use super::TcpStream;
1377    use std::os::unix::prelude::*;
1378
1379    impl AsRawFd for TcpStream {
1380        fn as_raw_fd(&self) -> RawFd {
1381            self.io.as_raw_fd()
1382        }
1383    }
1384
1385    impl AsFd for TcpStream {
1386        fn as_fd(&self) -> BorrowedFd<'_> {
1387            unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1388        }
1389    }
1390}
1391
1392cfg_windows! {
1393    use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket};
1394
1395    impl AsRawSocket for TcpStream {
1396        fn as_raw_socket(&self) -> RawSocket {
1397            self.io.as_raw_socket()
1398        }
1399    }
1400
1401    impl AsSocket for TcpStream {
1402        fn as_socket(&self) -> BorrowedSocket<'_> {
1403            unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
1404        }
1405    }
1406}
1407
1408#[cfg(all(tokio_unstable, target_os = "wasi"))]
1409mod sys {
1410    use super::TcpStream;
1411    use std::os::wasi::prelude::*;
1412
1413    impl AsRawFd for TcpStream {
1414        fn as_raw_fd(&self) -> RawFd {
1415            self.io.as_raw_fd()
1416        }
1417    }
1418
1419    impl AsFd for TcpStream {
1420        fn as_fd(&self) -> BorrowedFd<'_> {
1421            unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1422        }
1423    }
1424}