broker_tokio/net/tcp/
stream.rs

1use crate::future::poll_fn;
2use crate::io::{AsyncRead, AsyncWrite, PollEvented};
3use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
4use crate::net::ToSocketAddrs;
5
6use bytes::Buf;
7use iovec::IoVec;
8use std::convert::TryFrom;
9use std::fmt;
10use std::io::{self, Read, Write};
11use std::mem::MaybeUninit;
12use std::net::{self, Shutdown, SocketAddr};
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use std::time::Duration;
16
17cfg_tcp! {
18    /// A TCP stream between a local and a remote socket.
19    ///
20    /// A TCP stream can either be created by connecting to an endpoint, via the
21    /// [`connect`] method, or by [accepting] a connection from a [listener].
22    ///
23    /// [`connect`]: struct.TcpStream.html#method.connect
24    /// [accepting]: struct.TcpListener.html#method.accept
25    /// [listener]: struct.TcpListener.html
26    ///
27    /// # Examples
28    ///
29    /// ```no_run
30    /// use tokio::net::TcpStream;
31    /// use tokio::prelude::*;
32    /// use std::error::Error;
33    ///
34    /// #[tokio::main]
35    /// async fn main() -> Result<(), Box<dyn Error>> {
36    ///     // Connect to a peer
37    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
38    ///
39    ///     // Write some data.
40    ///     stream.write_all(b"hello world!").await?;
41    ///
42    ///     Ok(())
43    /// }
44    /// ```
45    pub struct TcpStream {
46        io: PollEvented<mio::net::TcpStream>,
47    }
48}
49
50impl TcpStream {
51    /// Opens a TCP connection to a remote host.
52    ///
53    /// `addr` is an address of the remote host. Anything which implements
54    /// `ToSocketAddrs` trait can be supplied for the address.
55    ///
56    /// If `addr` yields multiple addresses, connect will be attempted with each
57    /// of the addresses until a connection is successful. If none of the
58    /// addresses result in a successful connection, the error returned from the
59    /// last connection attempt (the last address) is returned.
60    ///
61    /// # Examples
62    ///
63    /// ```no_run
64    /// use tokio::net::TcpStream;
65    /// use tokio::prelude::*;
66    /// use std::error::Error;
67    ///
68    /// #[tokio::main]
69    /// async fn main() -> Result<(), Box<dyn Error>> {
70    ///     // Connect to a peer
71    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
72    ///
73    ///     // Write some data.
74    ///     stream.write_all(b"hello world!").await?;
75    ///
76    ///     Ok(())
77    /// }
78    /// ```
79    pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
80        let addrs = addr.to_socket_addrs().await?;
81
82        let mut last_err = None;
83
84        for addr in addrs {
85            match TcpStream::connect_addr(addr).await {
86                Ok(stream) => return Ok(stream),
87                Err(e) => last_err = Some(e),
88            }
89        }
90
91        Err(last_err.unwrap_or_else(|| {
92            io::Error::new(
93                io::ErrorKind::InvalidInput,
94                "could not resolve to any addresses",
95            )
96        }))
97    }
98
99    /// Establish a connection to the specified `addr`.
100    async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
101        let sys = mio::net::TcpStream::connect(&addr)?;
102        let stream = TcpStream::new(sys)?;
103
104        // Once we've connected, wait for the stream to be writable as
105        // that's when the actual connection has been initiated. Once we're
106        // writable we check for `take_socket_error` to see if the connect
107        // actually hit an error or not.
108        //
109        // If all that succeeded then we ship everything on up.
110        poll_fn(|cx| stream.io.poll_write_ready(cx)).await?;
111
112        if let Some(e) = stream.io.get_ref().take_error()? {
113            return Err(e);
114        }
115
116        Ok(stream)
117    }
118
119    pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
120        let io = PollEvented::new(connected)?;
121        Ok(TcpStream { io })
122    }
123
124    /// Create a new `TcpStream` from a `std::net::TcpStream`.
125    ///
126    /// This function will convert a TCP stream created by the standard library
127    /// to a TCP stream ready to be used with the provided event loop handle.
128    ///
129    /// # Examples
130    ///
131    /// ```rust,no_run
132    /// use std::error::Error;
133    /// use tokio::net::TcpStream;
134    ///
135    /// #[tokio::main]
136    /// async fn main() -> Result<(), Box<dyn Error>> {
137    ///     let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
138    ///     let stream = TcpStream::from_std(std_stream)?;
139    ///     Ok(())
140    /// }
141    /// ```
142    ///
143    /// # Panics
144    ///
145    /// This function panics if thread-local runtime is not set.
146    ///
147    /// The runtime is usually set implicitly when this function is called
148    /// from a future driven by a tokio runtime, otherwise runtime can be set
149    /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
150    ///
151    /// # Panics
152    ///
153    /// This function panics if thread-local runtime is not set.
154    ///
155    /// The runtime is usually set implicitly when this function is called
156    /// from a future driven by a tokio runtime, otherwise runtime can be set
157    /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
158    pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> {
159        let io = mio::net::TcpStream::from_stream(stream)?;
160        let io = PollEvented::new(io)?;
161        Ok(TcpStream { io })
162    }
163
164    // Connect a TcpStream asynchronously that may be built with a net2 TcpBuilder.
165    //
166    // This should be removed in favor of some in-crate TcpSocket builder API.
167    #[doc(hidden)]
168    pub async fn connect_std(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
169        let io = mio::net::TcpStream::connect_stream(stream, addr)?;
170        let io = PollEvented::new(io)?;
171        let stream = TcpStream { io };
172
173        // Once we've connected, wait for the stream to be writable as
174        // that's when the actual connection has been initiated. Once we're
175        // writable we check for `take_socket_error` to see if the connect
176        // actually hit an error or not.
177        //
178        // If all that succeeded then we ship everything on up.
179        poll_fn(|cx| stream.io.poll_write_ready(cx)).await?;
180
181        if let Some(e) = stream.io.get_ref().take_error()? {
182            return Err(e);
183        }
184
185        Ok(stream)
186    }
187
188    /// Returns the local address that this stream is bound to.
189    ///
190    /// # Examples
191    ///
192    /// ```no_run
193    /// use tokio::net::TcpStream;
194    ///
195    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
196    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
197    ///
198    /// println!("{:?}", stream.local_addr()?);
199    /// # Ok(())
200    /// # }
201    /// ```
202    pub fn local_addr(&self) -> io::Result<SocketAddr> {
203        self.io.get_ref().local_addr()
204    }
205
206    /// Returns the remote address that this stream is connected to.
207    ///
208    /// # Examples
209    ///
210    /// ```no_run
211    /// use tokio::net::TcpStream;
212    ///
213    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
214    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
215    ///
216    /// println!("{:?}", stream.peer_addr()?);
217    /// # Ok(())
218    /// # }
219    /// ```
220    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
221        self.io.get_ref().peer_addr()
222    }
223
224    /// Attempt to receive data on the socket, without removing that data from
225    /// the queue, registering the current task for wakeup if data is not yet
226    /// available.
227    ///
228    /// # Return value
229    ///
230    /// The function returns:
231    ///
232    /// * `Poll::Pending` if data is not yet available.
233    /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
234    /// * `Poll::Ready(Err(e))` if an error is encountered.
235    ///
236    /// # Errors
237    ///
238    /// This function may encounter any standard I/O error except `WouldBlock`.
239    ///
240    /// # Examples
241    ///
242    /// ```no_run
243    /// use tokio::io;
244    /// use tokio::net::TcpStream;
245    ///
246    /// use futures::future::poll_fn;
247    ///
248    /// #[tokio::main]
249    /// async fn main() -> io::Result<()> {
250    ///     let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
251    ///     let mut buf = [0; 10];
252    ///
253    ///     poll_fn(|cx| {
254    ///         stream.poll_peek(cx, &mut buf)
255    ///     }).await?;
256    ///
257    ///     Ok(())
258    /// }
259    /// ```
260    pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
261        self.poll_peek2(cx, buf)
262    }
263
264    pub(super) fn poll_peek2(
265        &self,
266        cx: &mut Context<'_>,
267        buf: &mut [u8],
268    ) -> Poll<io::Result<usize>> {
269        ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
270
271        match self.io.get_ref().peek(buf) {
272            Ok(ret) => Poll::Ready(Ok(ret)),
273            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
274                self.io.clear_read_ready(cx, mio::Ready::readable())?;
275                Poll::Pending
276            }
277            Err(e) => Poll::Ready(Err(e)),
278        }
279    }
280
281    /// Receives data on the socket from the remote address to which it is
282    /// connected, without removing that data from the queue. On success,
283    /// returns the number of bytes peeked.
284    ///
285    /// Successive calls return the same data. This is accomplished by passing
286    /// `MSG_PEEK` as a flag to the underlying recv system call.
287    ///
288    /// # Examples
289    ///
290    /// ```no_run
291    /// use tokio::net::TcpStream;
292    /// use tokio::prelude::*;
293    /// use std::error::Error;
294    ///
295    /// #[tokio::main]
296    /// async fn main() -> Result<(), Box<dyn Error>> {
297    ///     // Connect to a peer
298    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
299    ///
300    ///     let mut b1 = [0; 10];
301    ///     let mut b2 = [0; 10];
302    ///
303    ///     // Peek at the data
304    ///     let n = stream.peek(&mut b1).await?;
305    ///
306    ///     // Read the data
307    ///     assert_eq!(n, stream.read(&mut b2[..n]).await?);
308    ///     assert_eq!(&b1[..n], &b2[..n]);
309    ///
310    ///     Ok(())
311    /// }
312    /// ```
313    pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
314        poll_fn(|cx| self.poll_peek(cx, buf)).await
315    }
316
317    /// Shuts down the read, write, or both halves of this connection.
318    ///
319    /// This function will cause all pending and future I/O on the specified
320    /// portions to return immediately with an appropriate value (see the
321    /// documentation of `Shutdown`).
322    ///
323    /// # Examples
324    ///
325    /// ```no_run
326    /// use tokio::net::TcpStream;
327    /// use std::error::Error;
328    /// use std::net::Shutdown;
329    ///
330    /// #[tokio::main]
331    /// async fn main() -> Result<(), Box<dyn Error>> {
332    ///     // Connect to a peer
333    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
334    ///
335    ///     // Shutdown the stream
336    ///     stream.shutdown(Shutdown::Write)?;
337    ///
338    ///     Ok(())
339    /// }
340    /// ```
341    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
342        self.io.get_ref().shutdown(how)
343    }
344
345    /// Gets the value of the `TCP_NODELAY` option on this socket.
346    ///
347    /// For more information about this option, see [`set_nodelay`].
348    ///
349    /// [`set_nodelay`]: #method.set_nodelay
350    ///
351    /// # Examples
352    ///
353    /// ```no_run
354    /// use tokio::net::TcpStream;
355    ///
356    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
357    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
358    ///
359    /// println!("{:?}", stream.nodelay()?);
360    /// # Ok(())
361    /// # }
362    /// ```
363    pub fn nodelay(&self) -> io::Result<bool> {
364        self.io.get_ref().nodelay()
365    }
366
367    /// Sets the value of the `TCP_NODELAY` option on this socket.
368    ///
369    /// If set, this option disables the Nagle algorithm. This means that
370    /// segments are always sent as soon as possible, even if there is only a
371    /// small amount of data. When not set, data is buffered until there is a
372    /// sufficient amount to send out, thereby avoiding the frequent sending of
373    /// small packets.
374    ///
375    /// # Examples
376    ///
377    /// ```no_run
378    /// use tokio::net::TcpStream;
379    ///
380    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
381    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
382    ///
383    /// stream.set_nodelay(true)?;
384    /// # Ok(())
385    /// # }
386    /// ```
387    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
388        self.io.get_ref().set_nodelay(nodelay)
389    }
390
391    /// Gets the value of the `SO_RCVBUF` option on this socket.
392    ///
393    /// For more information about this option, see [`set_recv_buffer_size`].
394    ///
395    /// [`set_recv_buffer_size`]: #tymethod.set_recv_buffer_size
396    ///
397    /// # Examples
398    ///
399    /// ```no_run
400    /// use tokio::net::TcpStream;
401    ///
402    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
403    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
404    ///
405    /// println!("{:?}", stream.recv_buffer_size()?);
406    /// # Ok(())
407    /// # }
408    /// ```
409    pub fn recv_buffer_size(&self) -> io::Result<usize> {
410        self.io.get_ref().recv_buffer_size()
411    }
412
413    /// Sets the value of the `SO_RCVBUF` option on this socket.
414    ///
415    /// Changes the size of the operating system's receive buffer associated
416    /// with the socket.
417    ///
418    /// # Examples
419    ///
420    /// ```no_run
421    /// use tokio::net::TcpStream;
422    ///
423    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
424    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
425    ///
426    /// stream.set_recv_buffer_size(100)?;
427    /// # Ok(())
428    /// # }
429    /// ```
430    pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
431        self.io.get_ref().set_recv_buffer_size(size)
432    }
433
434    /// Gets the value of the `SO_SNDBUF` option on this socket.
435    ///
436    /// For more information about this option, see [`set_send_buffer`].
437    ///
438    /// [`set_send_buffer`]: #tymethod.set_send_buffer
439    ///
440    /// # Examples
441    ///
442    /// Returns whether keepalive messages are enabled on this socket, and if so
443    /// the duration of time between them.
444    ///
445    /// For more information about this option, see [`set_keepalive`].
446    ///
447    /// [`set_keepalive`]: #tymethod.set_keepalive
448    ///
449    /// # Examples
450    ///
451    /// ```no_run
452    /// use tokio::net::TcpStream;
453    ///
454    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
455    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
456    ///
457    /// println!("{:?}", stream.send_buffer_size()?);
458    /// # Ok(())
459    /// # }
460    /// ```
461    pub fn send_buffer_size(&self) -> io::Result<usize> {
462        self.io.get_ref().send_buffer_size()
463    }
464
465    /// Sets the value of the `SO_SNDBUF` option on this socket.
466    ///
467    /// Changes the size of the operating system's send buffer associated with
468    /// the socket.
469    ///
470    /// # Examples
471    ///
472    /// ```no_run
473    /// use tokio::net::TcpStream;
474    ///
475    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
476    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
477    ///
478    /// stream.set_send_buffer_size(100)?;
479    /// # Ok(())
480    /// # }
481    /// ```
482    pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
483        self.io.get_ref().set_send_buffer_size(size)
484    }
485
486    /// Returns whether keepalive messages are enabled on this socket, and if so
487    /// the duration of time between them.
488    ///
489    /// For more information about this option, see [`set_keepalive`].
490    ///
491    /// [`set_keepalive`]: #tymethod.set_keepalive
492    ///
493    /// # Examples
494    ///
495    /// ```no_run
496    /// use tokio::net::TcpStream;
497    ///
498    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
499    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
500    ///
501    /// println!("{:?}", stream.keepalive()?);
502    /// # Ok(())
503    /// # }
504    /// ```
505    pub fn keepalive(&self) -> io::Result<Option<Duration>> {
506        self.io.get_ref().keepalive()
507    }
508
509    /// Sets whether keepalive messages are enabled to be sent on this socket.
510    ///
511    /// On Unix, this option will set the `SO_KEEPALIVE` as well as the
512    /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform).
513    /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option.
514    ///
515    /// If `None` is specified then keepalive messages are disabled, otherwise
516    /// the duration specified will be the time to remain idle before sending a
517    /// TCP keepalive probe.
518    ///
519    /// Some platforms specify this value in seconds, so sub-second
520    /// specifications may be omitted.
521    ///
522    /// # Examples
523    ///
524    /// ```no_run
525    /// use tokio::net::TcpStream;
526    ///
527    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
528    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
529    ///
530    /// stream.set_keepalive(None)?;
531    /// # Ok(())
532    /// # }
533    /// ```
534    pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
535        self.io.get_ref().set_keepalive(keepalive)
536    }
537
538    /// Gets the value of the `IP_TTL` option for this socket.
539    ///
540    /// For more information about this option, see [`set_ttl`].
541    ///
542    /// [`set_ttl`]: #tymethod.set_ttl
543    ///
544    /// # Examples
545    ///
546    /// ```no_run
547    /// use tokio::net::TcpStream;
548    ///
549    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
550    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
551    ///
552    /// println!("{:?}", stream.ttl()?);
553    /// # Ok(())
554    /// # }
555    /// ```
556    pub fn ttl(&self) -> io::Result<u32> {
557        self.io.get_ref().ttl()
558    }
559
560    /// Sets the value for the `IP_TTL` option on this socket.
561    ///
562    /// This value sets the time-to-live field that is used in every packet sent
563    /// from this socket.
564    ///
565    /// # Examples
566    ///
567    /// ```no_run
568    /// use tokio::net::TcpStream;
569    ///
570    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
571    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
572    ///
573    /// stream.set_ttl(123)?;
574    /// # Ok(())
575    /// # }
576    /// ```
577    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
578        self.io.get_ref().set_ttl(ttl)
579    }
580
581    /// Reads the linger duration for this socket by getting the `SO_LINGER`
582    /// option.
583    ///
584    /// For more information about this option, see [`set_linger`].
585    ///
586    /// [`set_linger`]: #tymethod.set_linger
587    ///
588    /// # Examples
589    ///
590    /// ```no_run
591    /// use tokio::net::TcpStream;
592    ///
593    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
594    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
595    ///
596    /// println!("{:?}", stream.linger()?);
597    /// # Ok(())
598    /// # }
599    /// ```
600    pub fn linger(&self) -> io::Result<Option<Duration>> {
601        self.io.get_ref().linger()
602    }
603
604    /// Sets the linger duration of this socket by setting the `SO_LINGER`
605    /// option.
606    ///
607    /// This option controls the action taken when a stream has unsent messages
608    /// and the stream is closed. If `SO_LINGER` is set, the system
609    /// shall block the process until it can transmit the data or until the
610    /// time expires.
611    ///
612    /// If `SO_LINGER` is not specified, and the stream is closed, the system
613    /// handles the call in a way that allows the process to continue as quickly
614    /// as possible.
615    ///
616    /// # Examples
617    ///
618    /// ```no_run
619    /// use tokio::net::TcpStream;
620    ///
621    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
622    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
623    ///
624    /// stream.set_linger(None)?;
625    /// # Ok(())
626    /// # }
627    /// ```
628    pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
629        self.io.get_ref().set_linger(dur)
630    }
631
632    /// Split a `TcpStream` into a read half and a write half, which can be used
633    /// to read and write the stream concurrently.
634    ///
635    /// See the module level documenation of [`split`](super::split) for more
636    /// details.
637    pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) {
638        split(self)
639    }
640
641    // == Poll IO functions that takes `&self` ==
642    //
643    // They are not public because (taken from the doc of `PollEvented`):
644    //
645    // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
646    // caller must ensure that there are at most two tasks that use a
647    // `PollEvented` instance concurrently. One for reading and one for writing.
648    // While violating this requirement is "safe" from a Rust memory model point
649    // of view, it will result in unexpected behavior in the form of lost
650    // notifications and tasks hanging.
651
652    pub(crate) fn poll_read_priv(
653        &self,
654        cx: &mut Context<'_>,
655        buf: &mut [u8],
656    ) -> Poll<io::Result<usize>> {
657        ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
658
659        match self.io.get_ref().read(buf) {
660            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
661                self.io.clear_read_ready(cx, mio::Ready::readable())?;
662                Poll::Pending
663            }
664            x => Poll::Ready(x),
665        }
666    }
667
668    pub(super) fn poll_write_priv(
669        &self,
670        cx: &mut Context<'_>,
671        buf: &[u8],
672    ) -> Poll<io::Result<usize>> {
673        ready!(self.io.poll_write_ready(cx))?;
674
675        match self.io.get_ref().write(buf) {
676            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
677                self.io.clear_write_ready(cx)?;
678                Poll::Pending
679            }
680            x => Poll::Ready(x),
681        }
682    }
683
684    pub(super) fn poll_write_buf_priv<B: Buf>(
685        &self,
686        cx: &mut Context<'_>,
687        buf: &mut B,
688    ) -> Poll<io::Result<usize>> {
689        use std::io::IoSlice;
690
691        ready!(self.io.poll_write_ready(cx))?;
692
693        // The `IoVec` (v0.1.x) type can't have a zero-length size, so create
694        // a dummy version from a 1-length slice which we'll overwrite with
695        // the `bytes_vectored` method.
696        static S: &[u8] = &[0];
697        const MAX_BUFS: usize = 64;
698
699        // IoSlice isn't Copy, so we must expand this manually ;_;
700        let mut slices: [IoSlice<'_>; MAX_BUFS] = [
701            IoSlice::new(S),
702            IoSlice::new(S),
703            IoSlice::new(S),
704            IoSlice::new(S),
705            IoSlice::new(S),
706            IoSlice::new(S),
707            IoSlice::new(S),
708            IoSlice::new(S),
709            IoSlice::new(S),
710            IoSlice::new(S),
711            IoSlice::new(S),
712            IoSlice::new(S),
713            IoSlice::new(S),
714            IoSlice::new(S),
715            IoSlice::new(S),
716            IoSlice::new(S),
717            IoSlice::new(S),
718            IoSlice::new(S),
719            IoSlice::new(S),
720            IoSlice::new(S),
721            IoSlice::new(S),
722            IoSlice::new(S),
723            IoSlice::new(S),
724            IoSlice::new(S),
725            IoSlice::new(S),
726            IoSlice::new(S),
727            IoSlice::new(S),
728            IoSlice::new(S),
729            IoSlice::new(S),
730            IoSlice::new(S),
731            IoSlice::new(S),
732            IoSlice::new(S),
733            IoSlice::new(S),
734            IoSlice::new(S),
735            IoSlice::new(S),
736            IoSlice::new(S),
737            IoSlice::new(S),
738            IoSlice::new(S),
739            IoSlice::new(S),
740            IoSlice::new(S),
741            IoSlice::new(S),
742            IoSlice::new(S),
743            IoSlice::new(S),
744            IoSlice::new(S),
745            IoSlice::new(S),
746            IoSlice::new(S),
747            IoSlice::new(S),
748            IoSlice::new(S),
749            IoSlice::new(S),
750            IoSlice::new(S),
751            IoSlice::new(S),
752            IoSlice::new(S),
753            IoSlice::new(S),
754            IoSlice::new(S),
755            IoSlice::new(S),
756            IoSlice::new(S),
757            IoSlice::new(S),
758            IoSlice::new(S),
759            IoSlice::new(S),
760            IoSlice::new(S),
761            IoSlice::new(S),
762            IoSlice::new(S),
763            IoSlice::new(S),
764            IoSlice::new(S),
765        ];
766        let cnt = buf.bytes_vectored(&mut slices);
767
768        let iovec = <&IoVec>::from(S);
769        let mut vecs = [iovec; MAX_BUFS];
770        for i in 0..cnt {
771            vecs[i] = (*slices[i]).into();
772        }
773
774        match self.io.get_ref().write_bufs(&vecs[..cnt]) {
775            Ok(n) => {
776                buf.advance(n);
777                Poll::Ready(Ok(n))
778            }
779            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
780                self.io.clear_write_ready(cx)?;
781                Poll::Pending
782            }
783            Err(e) => Poll::Ready(Err(e)),
784        }
785    }
786}
787
788impl TryFrom<TcpStream> for mio::net::TcpStream {
789    type Error = io::Error;
790
791    /// Consumes value, returning the mio I/O object.
792    ///
793    /// See [`PollEvented::into_inner`] for more details about
794    /// resource deregistration that happens during the call.
795    ///
796    /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
797    fn try_from(value: TcpStream) -> Result<Self, Self::Error> {
798        value.io.into_inner()
799    }
800}
801
802impl TryFrom<net::TcpStream> for TcpStream {
803    type Error = io::Error;
804
805    /// Consumes stream, returning the tokio I/O object.
806    ///
807    /// This is equivalent to
808    /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
809    fn try_from(stream: net::TcpStream) -> Result<Self, Self::Error> {
810        Self::from_std(stream)
811    }
812}
813
814// ===== impl Read / Write =====
815
816impl AsyncRead for TcpStream {
817    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
818        false
819    }
820
821    fn poll_read(
822        self: Pin<&mut Self>,
823        cx: &mut Context<'_>,
824        buf: &mut [u8],
825    ) -> Poll<io::Result<usize>> {
826        self.poll_read_priv(cx, buf)
827    }
828}
829
830impl AsyncWrite for TcpStream {
831    fn poll_write(
832        self: Pin<&mut Self>,
833        cx: &mut Context<'_>,
834        buf: &[u8],
835    ) -> Poll<io::Result<usize>> {
836        self.poll_write_priv(cx, buf)
837    }
838
839    fn poll_write_buf<B: Buf>(
840        self: Pin<&mut Self>,
841        cx: &mut Context<'_>,
842        buf: &mut B,
843    ) -> Poll<io::Result<usize>> {
844        self.poll_write_buf_priv(cx, buf)
845    }
846
847    #[inline]
848    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
849        // tcp flush is a no-op
850        Poll::Ready(Ok(()))
851    }
852
853    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
854        self.shutdown(std::net::Shutdown::Write)?;
855        Poll::Ready(Ok(()))
856    }
857}
858
859impl fmt::Debug for TcpStream {
860    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861        self.io.get_ref().fmt(f)
862    }
863}
864
865#[cfg(unix)]
866mod sys {
867    use super::TcpStream;
868    use std::os::unix::prelude::*;
869
870    impl AsRawFd for TcpStream {
871        fn as_raw_fd(&self) -> RawFd {
872            self.io.get_ref().as_raw_fd()
873        }
874    }
875}
876
877#[cfg(windows)]
878mod sys {
879    // TODO: let's land these upstream with mio and then we can add them here.
880    //
881    // use std::os::windows::prelude::*;
882    // use super::TcpStream;
883    //
884    // impl AsRawHandle for TcpStream {
885    //     fn as_raw_handle(&self) -> RawHandle {
886    //         self.io.get_ref().as_raw_handle()
887    //     }
888    // }
889}