broker_tokio/net/tcp/stream.rs
1use crate::future::poll_fn;
2use crate::io::{AsyncRead, AsyncWrite, PollEvented};
3use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
4use crate::net::ToSocketAddrs;
5
6use bytes::Buf;
7use iovec::IoVec;
8use std::convert::TryFrom;
9use std::fmt;
10use std::io::{self, Read, Write};
11use std::mem::MaybeUninit;
12use std::net::{self, Shutdown, SocketAddr};
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use std::time::Duration;
16
17cfg_tcp! {
18 /// A TCP stream between a local and a remote socket.
19 ///
20 /// A TCP stream can either be created by connecting to an endpoint, via the
21 /// [`connect`] method, or by [accepting] a connection from a [listener].
22 ///
23 /// [`connect`]: struct.TcpStream.html#method.connect
24 /// [accepting]: struct.TcpListener.html#method.accept
25 /// [listener]: struct.TcpListener.html
26 ///
27 /// # Examples
28 ///
29 /// ```no_run
30 /// use tokio::net::TcpStream;
31 /// use tokio::prelude::*;
32 /// use std::error::Error;
33 ///
34 /// #[tokio::main]
35 /// async fn main() -> Result<(), Box<dyn Error>> {
36 /// // Connect to a peer
37 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
38 ///
39 /// // Write some data.
40 /// stream.write_all(b"hello world!").await?;
41 ///
42 /// Ok(())
43 /// }
44 /// ```
45 pub struct TcpStream {
46 io: PollEvented<mio::net::TcpStream>,
47 }
48}
49
50impl TcpStream {
51 /// Opens a TCP connection to a remote host.
52 ///
53 /// `addr` is an address of the remote host. Anything which implements
54 /// `ToSocketAddrs` trait can be supplied for the address.
55 ///
56 /// If `addr` yields multiple addresses, connect will be attempted with each
57 /// of the addresses until a connection is successful. If none of the
58 /// addresses result in a successful connection, the error returned from the
59 /// last connection attempt (the last address) is returned.
60 ///
61 /// # Examples
62 ///
63 /// ```no_run
64 /// use tokio::net::TcpStream;
65 /// use tokio::prelude::*;
66 /// use std::error::Error;
67 ///
68 /// #[tokio::main]
69 /// async fn main() -> Result<(), Box<dyn Error>> {
70 /// // Connect to a peer
71 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
72 ///
73 /// // Write some data.
74 /// stream.write_all(b"hello world!").await?;
75 ///
76 /// Ok(())
77 /// }
78 /// ```
79 pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
80 let addrs = addr.to_socket_addrs().await?;
81
82 let mut last_err = None;
83
84 for addr in addrs {
85 match TcpStream::connect_addr(addr).await {
86 Ok(stream) => return Ok(stream),
87 Err(e) => last_err = Some(e),
88 }
89 }
90
91 Err(last_err.unwrap_or_else(|| {
92 io::Error::new(
93 io::ErrorKind::InvalidInput,
94 "could not resolve to any addresses",
95 )
96 }))
97 }
98
99 /// Establish a connection to the specified `addr`.
100 async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
101 let sys = mio::net::TcpStream::connect(&addr)?;
102 let stream = TcpStream::new(sys)?;
103
104 // Once we've connected, wait for the stream to be writable as
105 // that's when the actual connection has been initiated. Once we're
106 // writable we check for `take_socket_error` to see if the connect
107 // actually hit an error or not.
108 //
109 // If all that succeeded then we ship everything on up.
110 poll_fn(|cx| stream.io.poll_write_ready(cx)).await?;
111
112 if let Some(e) = stream.io.get_ref().take_error()? {
113 return Err(e);
114 }
115
116 Ok(stream)
117 }
118
119 pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
120 let io = PollEvented::new(connected)?;
121 Ok(TcpStream { io })
122 }
123
124 /// Create a new `TcpStream` from a `std::net::TcpStream`.
125 ///
126 /// This function will convert a TCP stream created by the standard library
127 /// to a TCP stream ready to be used with the provided event loop handle.
128 ///
129 /// # Examples
130 ///
131 /// ```rust,no_run
132 /// use std::error::Error;
133 /// use tokio::net::TcpStream;
134 ///
135 /// #[tokio::main]
136 /// async fn main() -> Result<(), Box<dyn Error>> {
137 /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
138 /// let stream = TcpStream::from_std(std_stream)?;
139 /// Ok(())
140 /// }
141 /// ```
142 ///
143 /// # Panics
144 ///
145 /// This function panics if thread-local runtime is not set.
146 ///
147 /// The runtime is usually set implicitly when this function is called
148 /// from a future driven by a tokio runtime, otherwise runtime can be set
149 /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
150 ///
151 /// # Panics
152 ///
153 /// This function panics if thread-local runtime is not set.
154 ///
155 /// The runtime is usually set implicitly when this function is called
156 /// from a future driven by a tokio runtime, otherwise runtime can be set
157 /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
158 pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> {
159 let io = mio::net::TcpStream::from_stream(stream)?;
160 let io = PollEvented::new(io)?;
161 Ok(TcpStream { io })
162 }
163
164 // Connect a TcpStream asynchronously that may be built with a net2 TcpBuilder.
165 //
166 // This should be removed in favor of some in-crate TcpSocket builder API.
167 #[doc(hidden)]
168 pub async fn connect_std(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
169 let io = mio::net::TcpStream::connect_stream(stream, addr)?;
170 let io = PollEvented::new(io)?;
171 let stream = TcpStream { io };
172
173 // Once we've connected, wait for the stream to be writable as
174 // that's when the actual connection has been initiated. Once we're
175 // writable we check for `take_socket_error` to see if the connect
176 // actually hit an error or not.
177 //
178 // If all that succeeded then we ship everything on up.
179 poll_fn(|cx| stream.io.poll_write_ready(cx)).await?;
180
181 if let Some(e) = stream.io.get_ref().take_error()? {
182 return Err(e);
183 }
184
185 Ok(stream)
186 }
187
188 /// Returns the local address that this stream is bound to.
189 ///
190 /// # Examples
191 ///
192 /// ```no_run
193 /// use tokio::net::TcpStream;
194 ///
195 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
196 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
197 ///
198 /// println!("{:?}", stream.local_addr()?);
199 /// # Ok(())
200 /// # }
201 /// ```
202 pub fn local_addr(&self) -> io::Result<SocketAddr> {
203 self.io.get_ref().local_addr()
204 }
205
206 /// Returns the remote address that this stream is connected to.
207 ///
208 /// # Examples
209 ///
210 /// ```no_run
211 /// use tokio::net::TcpStream;
212 ///
213 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
214 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
215 ///
216 /// println!("{:?}", stream.peer_addr()?);
217 /// # Ok(())
218 /// # }
219 /// ```
220 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
221 self.io.get_ref().peer_addr()
222 }
223
224 /// Attempt to receive data on the socket, without removing that data from
225 /// the queue, registering the current task for wakeup if data is not yet
226 /// available.
227 ///
228 /// # Return value
229 ///
230 /// The function returns:
231 ///
232 /// * `Poll::Pending` if data is not yet available.
233 /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
234 /// * `Poll::Ready(Err(e))` if an error is encountered.
235 ///
236 /// # Errors
237 ///
238 /// This function may encounter any standard I/O error except `WouldBlock`.
239 ///
240 /// # Examples
241 ///
242 /// ```no_run
243 /// use tokio::io;
244 /// use tokio::net::TcpStream;
245 ///
246 /// use futures::future::poll_fn;
247 ///
248 /// #[tokio::main]
249 /// async fn main() -> io::Result<()> {
250 /// let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
251 /// let mut buf = [0; 10];
252 ///
253 /// poll_fn(|cx| {
254 /// stream.poll_peek(cx, &mut buf)
255 /// }).await?;
256 ///
257 /// Ok(())
258 /// }
259 /// ```
260 pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
261 self.poll_peek2(cx, buf)
262 }
263
264 pub(super) fn poll_peek2(
265 &self,
266 cx: &mut Context<'_>,
267 buf: &mut [u8],
268 ) -> Poll<io::Result<usize>> {
269 ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
270
271 match self.io.get_ref().peek(buf) {
272 Ok(ret) => Poll::Ready(Ok(ret)),
273 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
274 self.io.clear_read_ready(cx, mio::Ready::readable())?;
275 Poll::Pending
276 }
277 Err(e) => Poll::Ready(Err(e)),
278 }
279 }
280
281 /// Receives data on the socket from the remote address to which it is
282 /// connected, without removing that data from the queue. On success,
283 /// returns the number of bytes peeked.
284 ///
285 /// Successive calls return the same data. This is accomplished by passing
286 /// `MSG_PEEK` as a flag to the underlying recv system call.
287 ///
288 /// # Examples
289 ///
290 /// ```no_run
291 /// use tokio::net::TcpStream;
292 /// use tokio::prelude::*;
293 /// use std::error::Error;
294 ///
295 /// #[tokio::main]
296 /// async fn main() -> Result<(), Box<dyn Error>> {
297 /// // Connect to a peer
298 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
299 ///
300 /// let mut b1 = [0; 10];
301 /// let mut b2 = [0; 10];
302 ///
303 /// // Peek at the data
304 /// let n = stream.peek(&mut b1).await?;
305 ///
306 /// // Read the data
307 /// assert_eq!(n, stream.read(&mut b2[..n]).await?);
308 /// assert_eq!(&b1[..n], &b2[..n]);
309 ///
310 /// Ok(())
311 /// }
312 /// ```
313 pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
314 poll_fn(|cx| self.poll_peek(cx, buf)).await
315 }
316
317 /// Shuts down the read, write, or both halves of this connection.
318 ///
319 /// This function will cause all pending and future I/O on the specified
320 /// portions to return immediately with an appropriate value (see the
321 /// documentation of `Shutdown`).
322 ///
323 /// # Examples
324 ///
325 /// ```no_run
326 /// use tokio::net::TcpStream;
327 /// use std::error::Error;
328 /// use std::net::Shutdown;
329 ///
330 /// #[tokio::main]
331 /// async fn main() -> Result<(), Box<dyn Error>> {
332 /// // Connect to a peer
333 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
334 ///
335 /// // Shutdown the stream
336 /// stream.shutdown(Shutdown::Write)?;
337 ///
338 /// Ok(())
339 /// }
340 /// ```
341 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
342 self.io.get_ref().shutdown(how)
343 }
344
345 /// Gets the value of the `TCP_NODELAY` option on this socket.
346 ///
347 /// For more information about this option, see [`set_nodelay`].
348 ///
349 /// [`set_nodelay`]: #method.set_nodelay
350 ///
351 /// # Examples
352 ///
353 /// ```no_run
354 /// use tokio::net::TcpStream;
355 ///
356 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
357 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
358 ///
359 /// println!("{:?}", stream.nodelay()?);
360 /// # Ok(())
361 /// # }
362 /// ```
363 pub fn nodelay(&self) -> io::Result<bool> {
364 self.io.get_ref().nodelay()
365 }
366
367 /// Sets the value of the `TCP_NODELAY` option on this socket.
368 ///
369 /// If set, this option disables the Nagle algorithm. This means that
370 /// segments are always sent as soon as possible, even if there is only a
371 /// small amount of data. When not set, data is buffered until there is a
372 /// sufficient amount to send out, thereby avoiding the frequent sending of
373 /// small packets.
374 ///
375 /// # Examples
376 ///
377 /// ```no_run
378 /// use tokio::net::TcpStream;
379 ///
380 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
381 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
382 ///
383 /// stream.set_nodelay(true)?;
384 /// # Ok(())
385 /// # }
386 /// ```
387 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
388 self.io.get_ref().set_nodelay(nodelay)
389 }
390
391 /// Gets the value of the `SO_RCVBUF` option on this socket.
392 ///
393 /// For more information about this option, see [`set_recv_buffer_size`].
394 ///
395 /// [`set_recv_buffer_size`]: #tymethod.set_recv_buffer_size
396 ///
397 /// # Examples
398 ///
399 /// ```no_run
400 /// use tokio::net::TcpStream;
401 ///
402 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
403 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
404 ///
405 /// println!("{:?}", stream.recv_buffer_size()?);
406 /// # Ok(())
407 /// # }
408 /// ```
409 pub fn recv_buffer_size(&self) -> io::Result<usize> {
410 self.io.get_ref().recv_buffer_size()
411 }
412
413 /// Sets the value of the `SO_RCVBUF` option on this socket.
414 ///
415 /// Changes the size of the operating system's receive buffer associated
416 /// with the socket.
417 ///
418 /// # Examples
419 ///
420 /// ```no_run
421 /// use tokio::net::TcpStream;
422 ///
423 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
424 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
425 ///
426 /// stream.set_recv_buffer_size(100)?;
427 /// # Ok(())
428 /// # }
429 /// ```
430 pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
431 self.io.get_ref().set_recv_buffer_size(size)
432 }
433
434 /// Gets the value of the `SO_SNDBUF` option on this socket.
435 ///
436 /// For more information about this option, see [`set_send_buffer`].
437 ///
438 /// [`set_send_buffer`]: #tymethod.set_send_buffer
439 ///
440 /// # Examples
441 ///
442 /// Returns whether keepalive messages are enabled on this socket, and if so
443 /// the duration of time between them.
444 ///
445 /// For more information about this option, see [`set_keepalive`].
446 ///
447 /// [`set_keepalive`]: #tymethod.set_keepalive
448 ///
449 /// # Examples
450 ///
451 /// ```no_run
452 /// use tokio::net::TcpStream;
453 ///
454 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
455 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
456 ///
457 /// println!("{:?}", stream.send_buffer_size()?);
458 /// # Ok(())
459 /// # }
460 /// ```
461 pub fn send_buffer_size(&self) -> io::Result<usize> {
462 self.io.get_ref().send_buffer_size()
463 }
464
465 /// Sets the value of the `SO_SNDBUF` option on this socket.
466 ///
467 /// Changes the size of the operating system's send buffer associated with
468 /// the socket.
469 ///
470 /// # Examples
471 ///
472 /// ```no_run
473 /// use tokio::net::TcpStream;
474 ///
475 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
476 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
477 ///
478 /// stream.set_send_buffer_size(100)?;
479 /// # Ok(())
480 /// # }
481 /// ```
482 pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
483 self.io.get_ref().set_send_buffer_size(size)
484 }
485
486 /// Returns whether keepalive messages are enabled on this socket, and if so
487 /// the duration of time between them.
488 ///
489 /// For more information about this option, see [`set_keepalive`].
490 ///
491 /// [`set_keepalive`]: #tymethod.set_keepalive
492 ///
493 /// # Examples
494 ///
495 /// ```no_run
496 /// use tokio::net::TcpStream;
497 ///
498 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
499 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
500 ///
501 /// println!("{:?}", stream.keepalive()?);
502 /// # Ok(())
503 /// # }
504 /// ```
505 pub fn keepalive(&self) -> io::Result<Option<Duration>> {
506 self.io.get_ref().keepalive()
507 }
508
509 /// Sets whether keepalive messages are enabled to be sent on this socket.
510 ///
511 /// On Unix, this option will set the `SO_KEEPALIVE` as well as the
512 /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform).
513 /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option.
514 ///
515 /// If `None` is specified then keepalive messages are disabled, otherwise
516 /// the duration specified will be the time to remain idle before sending a
517 /// TCP keepalive probe.
518 ///
519 /// Some platforms specify this value in seconds, so sub-second
520 /// specifications may be omitted.
521 ///
522 /// # Examples
523 ///
524 /// ```no_run
525 /// use tokio::net::TcpStream;
526 ///
527 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
528 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
529 ///
530 /// stream.set_keepalive(None)?;
531 /// # Ok(())
532 /// # }
533 /// ```
534 pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
535 self.io.get_ref().set_keepalive(keepalive)
536 }
537
538 /// Gets the value of the `IP_TTL` option for this socket.
539 ///
540 /// For more information about this option, see [`set_ttl`].
541 ///
542 /// [`set_ttl`]: #tymethod.set_ttl
543 ///
544 /// # Examples
545 ///
546 /// ```no_run
547 /// use tokio::net::TcpStream;
548 ///
549 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
550 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
551 ///
552 /// println!("{:?}", stream.ttl()?);
553 /// # Ok(())
554 /// # }
555 /// ```
556 pub fn ttl(&self) -> io::Result<u32> {
557 self.io.get_ref().ttl()
558 }
559
560 /// Sets the value for the `IP_TTL` option on this socket.
561 ///
562 /// This value sets the time-to-live field that is used in every packet sent
563 /// from this socket.
564 ///
565 /// # Examples
566 ///
567 /// ```no_run
568 /// use tokio::net::TcpStream;
569 ///
570 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
571 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
572 ///
573 /// stream.set_ttl(123)?;
574 /// # Ok(())
575 /// # }
576 /// ```
577 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
578 self.io.get_ref().set_ttl(ttl)
579 }
580
581 /// Reads the linger duration for this socket by getting the `SO_LINGER`
582 /// option.
583 ///
584 /// For more information about this option, see [`set_linger`].
585 ///
586 /// [`set_linger`]: #tymethod.set_linger
587 ///
588 /// # Examples
589 ///
590 /// ```no_run
591 /// use tokio::net::TcpStream;
592 ///
593 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
594 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
595 ///
596 /// println!("{:?}", stream.linger()?);
597 /// # Ok(())
598 /// # }
599 /// ```
600 pub fn linger(&self) -> io::Result<Option<Duration>> {
601 self.io.get_ref().linger()
602 }
603
604 /// Sets the linger duration of this socket by setting the `SO_LINGER`
605 /// option.
606 ///
607 /// This option controls the action taken when a stream has unsent messages
608 /// and the stream is closed. If `SO_LINGER` is set, the system
609 /// shall block the process until it can transmit the data or until the
610 /// time expires.
611 ///
612 /// If `SO_LINGER` is not specified, and the stream is closed, the system
613 /// handles the call in a way that allows the process to continue as quickly
614 /// as possible.
615 ///
616 /// # Examples
617 ///
618 /// ```no_run
619 /// use tokio::net::TcpStream;
620 ///
621 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
622 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
623 ///
624 /// stream.set_linger(None)?;
625 /// # Ok(())
626 /// # }
627 /// ```
628 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
629 self.io.get_ref().set_linger(dur)
630 }
631
632 /// Split a `TcpStream` into a read half and a write half, which can be used
633 /// to read and write the stream concurrently.
634 ///
635 /// See the module level documenation of [`split`](super::split) for more
636 /// details.
637 pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) {
638 split(self)
639 }
640
641 // == Poll IO functions that takes `&self` ==
642 //
643 // They are not public because (taken from the doc of `PollEvented`):
644 //
645 // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
646 // caller must ensure that there are at most two tasks that use a
647 // `PollEvented` instance concurrently. One for reading and one for writing.
648 // While violating this requirement is "safe" from a Rust memory model point
649 // of view, it will result in unexpected behavior in the form of lost
650 // notifications and tasks hanging.
651
652 pub(crate) fn poll_read_priv(
653 &self,
654 cx: &mut Context<'_>,
655 buf: &mut [u8],
656 ) -> Poll<io::Result<usize>> {
657 ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
658
659 match self.io.get_ref().read(buf) {
660 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
661 self.io.clear_read_ready(cx, mio::Ready::readable())?;
662 Poll::Pending
663 }
664 x => Poll::Ready(x),
665 }
666 }
667
668 pub(super) fn poll_write_priv(
669 &self,
670 cx: &mut Context<'_>,
671 buf: &[u8],
672 ) -> Poll<io::Result<usize>> {
673 ready!(self.io.poll_write_ready(cx))?;
674
675 match self.io.get_ref().write(buf) {
676 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
677 self.io.clear_write_ready(cx)?;
678 Poll::Pending
679 }
680 x => Poll::Ready(x),
681 }
682 }
683
684 pub(super) fn poll_write_buf_priv<B: Buf>(
685 &self,
686 cx: &mut Context<'_>,
687 buf: &mut B,
688 ) -> Poll<io::Result<usize>> {
689 use std::io::IoSlice;
690
691 ready!(self.io.poll_write_ready(cx))?;
692
693 // The `IoVec` (v0.1.x) type can't have a zero-length size, so create
694 // a dummy version from a 1-length slice which we'll overwrite with
695 // the `bytes_vectored` method.
696 static S: &[u8] = &[0];
697 const MAX_BUFS: usize = 64;
698
699 // IoSlice isn't Copy, so we must expand this manually ;_;
700 let mut slices: [IoSlice<'_>; MAX_BUFS] = [
701 IoSlice::new(S),
702 IoSlice::new(S),
703 IoSlice::new(S),
704 IoSlice::new(S),
705 IoSlice::new(S),
706 IoSlice::new(S),
707 IoSlice::new(S),
708 IoSlice::new(S),
709 IoSlice::new(S),
710 IoSlice::new(S),
711 IoSlice::new(S),
712 IoSlice::new(S),
713 IoSlice::new(S),
714 IoSlice::new(S),
715 IoSlice::new(S),
716 IoSlice::new(S),
717 IoSlice::new(S),
718 IoSlice::new(S),
719 IoSlice::new(S),
720 IoSlice::new(S),
721 IoSlice::new(S),
722 IoSlice::new(S),
723 IoSlice::new(S),
724 IoSlice::new(S),
725 IoSlice::new(S),
726 IoSlice::new(S),
727 IoSlice::new(S),
728 IoSlice::new(S),
729 IoSlice::new(S),
730 IoSlice::new(S),
731 IoSlice::new(S),
732 IoSlice::new(S),
733 IoSlice::new(S),
734 IoSlice::new(S),
735 IoSlice::new(S),
736 IoSlice::new(S),
737 IoSlice::new(S),
738 IoSlice::new(S),
739 IoSlice::new(S),
740 IoSlice::new(S),
741 IoSlice::new(S),
742 IoSlice::new(S),
743 IoSlice::new(S),
744 IoSlice::new(S),
745 IoSlice::new(S),
746 IoSlice::new(S),
747 IoSlice::new(S),
748 IoSlice::new(S),
749 IoSlice::new(S),
750 IoSlice::new(S),
751 IoSlice::new(S),
752 IoSlice::new(S),
753 IoSlice::new(S),
754 IoSlice::new(S),
755 IoSlice::new(S),
756 IoSlice::new(S),
757 IoSlice::new(S),
758 IoSlice::new(S),
759 IoSlice::new(S),
760 IoSlice::new(S),
761 IoSlice::new(S),
762 IoSlice::new(S),
763 IoSlice::new(S),
764 IoSlice::new(S),
765 ];
766 let cnt = buf.bytes_vectored(&mut slices);
767
768 let iovec = <&IoVec>::from(S);
769 let mut vecs = [iovec; MAX_BUFS];
770 for i in 0..cnt {
771 vecs[i] = (*slices[i]).into();
772 }
773
774 match self.io.get_ref().write_bufs(&vecs[..cnt]) {
775 Ok(n) => {
776 buf.advance(n);
777 Poll::Ready(Ok(n))
778 }
779 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
780 self.io.clear_write_ready(cx)?;
781 Poll::Pending
782 }
783 Err(e) => Poll::Ready(Err(e)),
784 }
785 }
786}
787
788impl TryFrom<TcpStream> for mio::net::TcpStream {
789 type Error = io::Error;
790
791 /// Consumes value, returning the mio I/O object.
792 ///
793 /// See [`PollEvented::into_inner`] for more details about
794 /// resource deregistration that happens during the call.
795 ///
796 /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
797 fn try_from(value: TcpStream) -> Result<Self, Self::Error> {
798 value.io.into_inner()
799 }
800}
801
802impl TryFrom<net::TcpStream> for TcpStream {
803 type Error = io::Error;
804
805 /// Consumes stream, returning the tokio I/O object.
806 ///
807 /// This is equivalent to
808 /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
809 fn try_from(stream: net::TcpStream) -> Result<Self, Self::Error> {
810 Self::from_std(stream)
811 }
812}
813
814// ===== impl Read / Write =====
815
816impl AsyncRead for TcpStream {
817 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
818 false
819 }
820
821 fn poll_read(
822 self: Pin<&mut Self>,
823 cx: &mut Context<'_>,
824 buf: &mut [u8],
825 ) -> Poll<io::Result<usize>> {
826 self.poll_read_priv(cx, buf)
827 }
828}
829
830impl AsyncWrite for TcpStream {
831 fn poll_write(
832 self: Pin<&mut Self>,
833 cx: &mut Context<'_>,
834 buf: &[u8],
835 ) -> Poll<io::Result<usize>> {
836 self.poll_write_priv(cx, buf)
837 }
838
839 fn poll_write_buf<B: Buf>(
840 self: Pin<&mut Self>,
841 cx: &mut Context<'_>,
842 buf: &mut B,
843 ) -> Poll<io::Result<usize>> {
844 self.poll_write_buf_priv(cx, buf)
845 }
846
847 #[inline]
848 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
849 // tcp flush is a no-op
850 Poll::Ready(Ok(()))
851 }
852
853 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
854 self.shutdown(std::net::Shutdown::Write)?;
855 Poll::Ready(Ok(()))
856 }
857}
858
859impl fmt::Debug for TcpStream {
860 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861 self.io.get_ref().fmt(f)
862 }
863}
864
865#[cfg(unix)]
866mod sys {
867 use super::TcpStream;
868 use std::os::unix::prelude::*;
869
870 impl AsRawFd for TcpStream {
871 fn as_raw_fd(&self) -> RawFd {
872 self.io.get_ref().as_raw_fd()
873 }
874 }
875}
876
877#[cfg(windows)]
878mod sys {
879 // TODO: let's land these upstream with mio and then we can add them here.
880 //
881 // use std::os::windows::prelude::*;
882 // use super::TcpStream;
883 //
884 // impl AsRawHandle for TcpStream {
885 // fn as_raw_handle(&self) -> RawHandle {
886 // self.io.get_ref().as_raw_handle()
887 // }
888 // }
889}