tokio_tcp/
stream.rs

1use std::fmt;
2use std::io::{self, Read, Write};
3use std::mem;
4use std::net::{self, Shutdown, SocketAddr};
5use std::time::Duration;
6
7use bytes::{Buf, BufMut};
8use futures::{Async, Future, Poll};
9use iovec::IoVec;
10use mio;
11use tokio_io::{AsyncRead, AsyncWrite};
12use tokio_reactor::{Handle, PollEvented};
13
14/// An I/O object representing a TCP stream connected to a remote endpoint.
15///
16/// A TCP stream can either be created by connecting to an endpoint, via the
17/// [`connect`] method, or by [accepting] a connection from a [listener].
18///
19/// [`connect`]: struct.TcpStream.html#method.connect
20/// [accepting]: struct.TcpListener.html#method.accept
21/// [listener]: struct.TcpListener.html
22///
23/// # Examples
24///
25/// ```
26/// # extern crate tokio;
27/// # extern crate futures;
28/// use futures::Future;
29/// use tokio::io::AsyncWrite;
30/// use tokio::net::TcpStream;
31/// use std::net::SocketAddr;
32///
33/// # fn main() -> Result<(), Box<std::error::Error>> {
34/// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
35/// let stream = TcpStream::connect(&addr);
36/// stream.map(|mut stream| {
37///     // Attempt to write bytes asynchronously to the stream
38///     stream.poll_write(&[1]);
39/// });
40/// # Ok(())
41/// # }
42/// ```
43pub struct TcpStream {
44    io: PollEvented<mio::net::TcpStream>,
45}
46
47/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
48/// when the stream is connected.
49#[must_use = "futures do nothing unless polled"]
50#[derive(Debug)]
51pub struct ConnectFuture {
52    inner: ConnectFutureState,
53}
54
55#[must_use = "futures do nothing unless polled"]
56#[derive(Debug)]
57enum ConnectFutureState {
58    Waiting(TcpStream),
59    Error(io::Error),
60    Empty,
61}
62
63impl TcpStream {
64    /// Create a new TCP stream connected to the specified address.
65    ///
66    /// This function will create a new TCP socket and attempt to connect it to
67    /// the `addr` provided. The returned future will be resolved once the
68    /// stream has successfully connected, or it will return an error if one
69    /// occurs.
70    ///
71    /// # Examples
72    ///
73    /// ```
74    /// # extern crate tokio;
75    /// # extern crate futures;
76    /// use futures::Future;
77    /// use tokio::net::TcpStream;
78    /// use std::net::SocketAddr;
79    ///
80    /// # fn main() -> Result<(), Box<std::error::Error>> {
81    /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
82    /// let stream = TcpStream::connect(&addr)
83    ///     .map(|stream|
84    ///         println!("successfully connected to {}", stream.local_addr().unwrap()));
85    /// # Ok(())
86    /// # }
87    /// ```
88    pub fn connect(addr: &SocketAddr) -> ConnectFuture {
89        use self::ConnectFutureState::*;
90
91        let inner = match mio::net::TcpStream::connect(addr) {
92            Ok(tcp) => Waiting(TcpStream::new(tcp)),
93            Err(e) => Error(e),
94        };
95
96        ConnectFuture { inner }
97    }
98
99    pub(crate) fn new(connected: mio::net::TcpStream) -> TcpStream {
100        let io = PollEvented::new(connected);
101        TcpStream { io }
102    }
103
104    /// Create a new `TcpStream` from a `net::TcpStream`.
105    ///
106    /// This function will convert a TCP stream created by the standard library
107    /// to a TCP stream ready to be used with the provided event loop handle.
108    /// Use `Handle::default()` to lazily bind to an event loop, just like `connect` does.
109    ///
110    /// # Examples
111    ///
112    /// ```no_run
113    /// # extern crate tokio;
114    /// # extern crate tokio_reactor;
115    /// use tokio::net::TcpStream;
116    /// use std::net::TcpStream as StdTcpStream;
117    /// use tokio_reactor::Handle;
118    ///
119    /// # fn main() -> Result<(), Box<std::error::Error>> {
120    /// let std_stream = StdTcpStream::connect("127.0.0.1:34254")?;
121    /// let stream = TcpStream::from_std(std_stream, &Handle::default())?;
122    /// # Ok(())
123    /// # }
124    /// ```
125    pub fn from_std(stream: net::TcpStream, handle: &Handle) -> io::Result<TcpStream> {
126        let io = mio::net::TcpStream::from_stream(stream)?;
127        let io = PollEvented::new_with_handle(io, handle)?;
128
129        Ok(TcpStream { io })
130    }
131
132    /// Creates a new `TcpStream` from the pending socket inside the given
133    /// `std::net::TcpStream`, connecting it to the address specified.
134    ///
135    /// This constructor allows configuring the socket before it's actually
136    /// connected, and this function will transfer ownership to the returned
137    /// `TcpStream` if successful. An unconnected `TcpStream` can be created
138    /// with the `net2::TcpBuilder` type (and also configured via that route).
139    ///
140    /// The platform specific behavior of this function looks like:
141    ///
142    /// * On Unix, the socket is placed into nonblocking mode and then a
143    ///   `connect` call is issued.
144    ///
145    /// * On Windows, the address is stored internally and the connect operation
146    ///   is issued when the returned `TcpStream` is registered with an event
147    ///   loop. Note that on Windows you must `bind` a socket before it can be
148    ///   connected, so if a custom `TcpBuilder` is used it should be bound
149    ///   (perhaps to `INADDR_ANY`) before this method is called.
150    pub fn connect_std(
151        stream: net::TcpStream,
152        addr: &SocketAddr,
153        handle: &Handle,
154    ) -> ConnectFuture {
155        use self::ConnectFutureState::*;
156
157        let io = mio::net::TcpStream::connect_stream(stream, addr)
158            .and_then(|io| PollEvented::new_with_handle(io, handle));
159
160        let inner = match io {
161            Ok(io) => Waiting(TcpStream { io }),
162            Err(e) => Error(e),
163        };
164
165        ConnectFuture { inner: inner }
166    }
167
168    /// Check the TCP stream's read readiness state.
169    ///
170    /// The mask argument allows specifying what readiness to notify on. This
171    /// can be any value, including platform specific readiness, **except**
172    /// `writable`. HUP is always implicitly included on platforms that support
173    /// it.
174    ///
175    /// If the resource is not ready for a read then `Async::NotReady` is
176    /// returned and the current task is notified once a new event is received.
177    ///
178    /// The stream will remain in a read-ready state until calls to `poll_read`
179    /// return `NotReady`.
180    ///
181    /// # Panics
182    ///
183    /// This function panics if:
184    ///
185    /// * `ready` includes writable.
186    /// * called from outside of a task context.
187    ///
188    /// # Examples
189    ///
190    /// ```
191    /// # extern crate mio;
192    /// # extern crate tokio;
193    /// # extern crate futures;
194    /// use mio::Ready;
195    /// use futures::Async;
196    /// use futures::Future;
197    /// use tokio::net::TcpStream;
198    /// use std::net::SocketAddr;
199    ///
200    /// # fn main() -> Result<(), Box<std::error::Error>> {
201    /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
202    /// let stream = TcpStream::connect(&addr);
203    /// stream.map(|stream| {
204    ///     match stream.poll_read_ready(Ready::readable()) {
205    ///         Ok(Async::Ready(_)) => println!("read ready"),
206    ///         Ok(Async::NotReady) => println!("not read ready"),
207    ///         Err(e) => eprintln!("got error: {}", e),
208    /// }
209    /// });
210    /// # Ok(())
211    /// # }
212    /// ```
213    pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
214        self.io.poll_read_ready(mask)
215    }
216
217    /// Check the TCP stream's write readiness state.
218    ///
219    /// This always checks for writable readiness and also checks for HUP
220    /// readiness on platforms that support it.
221    ///
222    /// If the resource is not ready for a write then `Async::NotReady` is
223    /// returned and the current task is notified once a new event is received.
224    ///
225    /// The I/O resource will remain in a write-ready state until calls to
226    /// `poll_write` return `NotReady`.
227    ///
228    /// # Panics
229    ///
230    /// This function panics if called from outside of a task context.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// # extern crate tokio;
236    /// # extern crate futures;
237    /// use futures::Async;
238    /// use futures::Future;
239    /// use tokio::net::TcpStream;
240    /// use std::net::SocketAddr;
241    ///
242    /// # fn main() -> Result<(), Box<std::error::Error>> {
243    /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
244    /// let stream = TcpStream::connect(&addr);
245    /// stream.map(|stream| {
246    ///     match stream.poll_write_ready() {
247    ///         Ok(Async::Ready(_)) => println!("write ready"),
248    ///         Ok(Async::NotReady) => println!("not write ready"),
249    ///         Err(e) => eprintln!("got error: {}", e),
250    /// }
251    /// });
252    /// # Ok(())
253    /// # }
254    /// ```
255    pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
256        self.io.poll_write_ready()
257    }
258
259    /// Returns the local address that this stream is bound to.
260    ///
261    /// # Examples
262    ///
263    /// ```
264    /// # extern crate tokio;
265    /// # extern crate futures;
266    /// use tokio::net::TcpStream;
267    /// use futures::Future;
268    /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
269    ///
270    /// # fn main() -> Result<(), Box<std::error::Error>> {
271    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
272    /// let stream = TcpStream::connect(&addr);
273    /// stream.map(|stream| {
274    ///     assert_eq!(stream.local_addr().unwrap(),
275    ///         SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)));
276    /// });
277    /// # Ok(())
278    /// # }
279    /// ```
280    pub fn local_addr(&self) -> io::Result<SocketAddr> {
281        self.io.get_ref().local_addr()
282    }
283
284    /// Returns the remote address that this stream is connected to.
285    /// # Examples
286    ///
287    /// ```
288    /// # extern crate tokio;
289    /// # extern crate futures;
290    /// use tokio::net::TcpStream;
291    /// use futures::Future;
292    /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
293    ///
294    /// # fn main() -> Result<(), Box<std::error::Error>> {
295    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
296    /// let stream = TcpStream::connect(&addr);
297    /// stream.map(|stream| {
298    ///     assert_eq!(stream.peer_addr().unwrap(),
299    ///         SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)));
300    /// });
301    /// # Ok(())
302    /// # }
303    /// ```
304    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
305        self.io.get_ref().peer_addr()
306    }
307
308    #[deprecated(since = "0.1.2", note = "use poll_peek instead")]
309    #[doc(hidden)]
310    pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
311        match self.poll_peek(buf)? {
312            Async::Ready(n) => Ok(n),
313            Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
314        }
315    }
316
317    /// Receives data on the socket from the remote address to which it is
318    /// connected, without removing that data from the queue. On success,
319    /// returns the number of bytes peeked.
320    ///
321    /// Successive calls return the same data. This is accomplished by passing
322    /// `MSG_PEEK` as a flag to the underlying recv system call.
323    ///
324    /// # Return
325    ///
326    /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
327    ///
328    /// If no data is available for reading, the method returns
329    /// `Ok(Async::NotReady)` and arranges for the current task to receive a
330    /// notification when the socket becomes readable or is closed.
331    ///
332    /// # Panics
333    ///
334    /// This function will panic if called from outside of a task context.
335    ///
336    /// # Examples
337    ///
338    /// ```
339    /// # extern crate tokio;
340    /// # extern crate futures;
341    /// use tokio::net::TcpStream;
342    /// use futures::Async;
343    /// use futures::Future;
344    /// use std::net::SocketAddr;
345    ///
346    /// # fn main() -> Result<(), Box<std::error::Error>> {
347    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
348    /// let stream = TcpStream::connect(&addr);
349    /// stream.map(|mut stream| {
350    ///     let mut buf = [0; 10];
351    ///     match stream.poll_peek(&mut buf) {
352    ///        Ok(Async::Ready(len)) => println!("read {} bytes", len),
353    ///        Ok(Async::NotReady) => println!("no data available"),
354    ///        Err(e) => eprintln!("got error: {}", e),
355    ///     }
356    /// });
357    /// # Ok(())
358    /// # }
359    /// ```
360    pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
361        try_ready!(self.io.poll_read_ready(mio::Ready::readable()));
362
363        match self.io.get_ref().peek(buf) {
364            Ok(ret) => Ok(ret.into()),
365            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
366                self.io.clear_read_ready(mio::Ready::readable())?;
367                Ok(Async::NotReady)
368            }
369            Err(e) => Err(e),
370        }
371    }
372
373    /// Shuts down the read, write, or both halves of this connection.
374    ///
375    /// This function will cause all pending and future I/O on the specified
376    /// portions to return immediately with an appropriate value (see the
377    /// documentation of `Shutdown`).
378    ///
379    /// # Examples
380    ///
381    /// ```
382    /// # extern crate tokio;
383    /// # extern crate futures;
384    /// use tokio::net::TcpStream;
385    /// use futures::Future;
386    /// use std::net::{Shutdown, SocketAddr};
387    ///
388    /// # fn main() -> Result<(), Box<std::error::Error>> {
389    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
390    /// let stream = TcpStream::connect(&addr);
391    /// stream.map(|stream| {
392    ///     stream.shutdown(Shutdown::Both)
393    /// });
394    /// # Ok(())
395    /// # }
396    /// ```
397    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
398        self.io.get_ref().shutdown(how)
399    }
400
401    /// Gets the value of the `TCP_NODELAY` option on this socket.
402    ///
403    /// For more information about this option, see [`set_nodelay`].
404    ///
405    /// [`set_nodelay`]: #method.set_nodelay
406    ///
407    /// # Examples
408    ///
409    /// ```
410    /// # extern crate tokio;
411    /// # extern crate futures;
412    /// use tokio::net::TcpStream;
413    /// use futures::Future;
414    /// use std::net::SocketAddr;
415    ///
416    /// # fn main() -> Result<(), Box<std::error::Error>> {
417    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
418    /// let stream = TcpStream::connect(&addr);
419    /// stream.map(|stream| {
420    ///     stream.set_nodelay(true).expect("set_nodelay call failed");;
421    ///     assert_eq!(stream.nodelay().unwrap_or(false), true);
422    /// });
423    /// # Ok(())
424    /// # }
425    /// ```
426    pub fn nodelay(&self) -> io::Result<bool> {
427        self.io.get_ref().nodelay()
428    }
429
430    /// Sets the value of the `TCP_NODELAY` option on this socket.
431    ///
432    /// If set, this option disables the Nagle algorithm. This means that
433    /// segments are always sent as soon as possible, even if there is only a
434    /// small amount of data. When not set, data is buffered until there is a
435    /// sufficient amount to send out, thereby avoiding the frequent sending of
436    /// small packets.
437    ///
438    /// # Examples
439    ///
440    /// ```
441    /// # extern crate tokio;
442    /// # extern crate futures;
443    /// use tokio::net::TcpStream;
444    /// use futures::Future;
445    /// use std::net::SocketAddr;
446    ///
447    /// # fn main() -> Result<(), Box<std::error::Error>> {
448    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
449    /// let stream = TcpStream::connect(&addr);
450    /// stream.map(|stream| {
451    ///     stream.set_nodelay(true).expect("set_nodelay call failed");
452    /// });
453    /// # Ok(())
454    /// # }
455    /// ```
456    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
457        self.io.get_ref().set_nodelay(nodelay)
458    }
459
460    /// Gets the value of the `SO_RCVBUF` option on this socket.
461    ///
462    /// For more information about this option, see [`set_recv_buffer_size`].
463    ///
464    /// [`set_recv_buffer_size`]: #tymethod.set_recv_buffer_size
465    ///
466    /// # Examples
467    ///
468    /// ```
469    /// # extern crate tokio;
470    /// # extern crate futures;
471    /// use tokio::net::TcpStream;
472    /// use futures::Future;
473    /// use std::net::SocketAddr;
474    ///
475    /// # fn main() -> Result<(), Box<std::error::Error>> {
476    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
477    /// let stream = TcpStream::connect(&addr);
478    /// stream.map(|stream| {
479    ///     stream.set_recv_buffer_size(100).expect("set_recv_buffer_size failed");
480    ///     assert_eq!(stream.recv_buffer_size().unwrap_or(0), 100);
481    /// });
482    /// # Ok(())
483    /// # }
484    /// ```
485    pub fn recv_buffer_size(&self) -> io::Result<usize> {
486        self.io.get_ref().recv_buffer_size()
487    }
488
489    /// Sets the value of the `SO_RCVBUF` option on this socket.
490    ///
491    /// Changes the size of the operating system's receive buffer associated
492    /// with the socket.
493    ///
494    /// # Examples
495    ///
496    /// ```
497    /// # extern crate tokio;
498    /// # extern crate futures;
499    /// use tokio::net::TcpStream;
500    /// use futures::Future;
501    /// use std::net::SocketAddr;
502    ///
503    /// # fn main() -> Result<(), Box<std::error::Error>> {
504    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
505    /// let stream = TcpStream::connect(&addr);
506    /// stream.map(|stream| {
507    ///     stream.set_recv_buffer_size(100).expect("set_recv_buffer_size failed");
508    /// });
509    /// # Ok(())
510    /// # }
511    /// ```
512    pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
513        self.io.get_ref().set_recv_buffer_size(size)
514    }
515
516    /// Gets the value of the `SO_SNDBUF` option on this socket.
517    ///
518    /// For more information about this option, see [`set_send_buffer`].
519    ///
520    /// [`set_send_buffer`]: #tymethod.set_send_buffer
521    ///
522    /// # Examples
523    ///
524    /// ```
525    /// # extern crate tokio;
526    /// # extern crate futures;
527    /// use tokio::net::TcpStream;
528    /// use futures::Future;
529    /// use std::net::SocketAddr;
530    ///
531    /// # fn main() -> Result<(), Box<std::error::Error>> {
532    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
533    /// let stream = TcpStream::connect(&addr);
534    /// stream.map(|stream| {
535    ///     stream.set_send_buffer_size(100).expect("set_send_buffer_size failed");
536    ///     assert_eq!(stream.send_buffer_size().unwrap_or(0), 100);
537    /// });
538    /// # Ok(())
539    /// # }
540    /// ```
541    pub fn send_buffer_size(&self) -> io::Result<usize> {
542        self.io.get_ref().send_buffer_size()
543    }
544
545    /// Sets the value of the `SO_SNDBUF` option on this socket.
546    ///
547    /// Changes the size of the operating system's send buffer associated with
548    /// the socket.
549    ///
550    /// # Examples
551    ///
552    /// ```
553    /// # extern crate tokio;
554    /// # extern crate futures;
555    /// use tokio::net::TcpStream;
556    /// use futures::Future;
557    /// use std::net::SocketAddr;
558    ///
559    /// # fn main() -> Result<(), Box<std::error::Error>> {
560    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
561    /// let stream = TcpStream::connect(&addr);
562    /// stream.map(|stream| {
563    ///     stream.set_send_buffer_size(100).expect("set_send_buffer_size failed");
564    /// });
565    /// # Ok(())
566    /// # }
567    /// ```
568    pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
569        self.io.get_ref().set_send_buffer_size(size)
570    }
571
572    /// Returns whether keepalive messages are enabled on this socket, and if so
573    /// the duration of time between them.
574    ///
575    /// For more information about this option, see [`set_keepalive`].
576    ///
577    /// [`set_keepalive`]: #tymethod.set_keepalive
578    ///
579    /// # Examples
580    ///
581    /// ```
582    /// # extern crate tokio;
583    /// # extern crate futures;
584    /// use tokio::net::TcpStream;
585    /// use futures::Future;
586    /// use std::net::SocketAddr;
587    ///
588    /// # fn main() -> Result<(), Box<std::error::Error>> {
589    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
590    /// let stream = TcpStream::connect(&addr);
591    /// stream.map(|stream| {
592    ///     stream.set_keepalive(None).expect("set_keepalive failed");
593    ///     assert_eq!(stream.keepalive().unwrap(), None);
594    /// });
595    /// # Ok(())
596    /// # }
597    /// ```
598    pub fn keepalive(&self) -> io::Result<Option<Duration>> {
599        self.io.get_ref().keepalive()
600    }
601
602    /// Sets whether keepalive messages are enabled to be sent on this socket.
603    ///
604    /// On Unix, this option will set the `SO_KEEPALIVE` as well as the
605    /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform).
606    /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option.
607    ///
608    /// If `None` is specified then keepalive messages are disabled, otherwise
609    /// the duration specified will be the time to remain idle before sending a
610    /// TCP keepalive probe.
611    ///
612    /// Some platforms specify this value in seconds, so sub-second
613    /// specifications may be omitted.
614    ///
615    /// # Examples
616    ///
617    /// ```
618    /// # extern crate tokio;
619    /// # extern crate futures;
620    /// use tokio::net::TcpStream;
621    /// use futures::Future;
622    /// use std::net::SocketAddr;
623    ///
624    /// # fn main() -> Result<(), Box<std::error::Error>> {
625    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
626    /// let stream = TcpStream::connect(&addr);
627    /// stream.map(|stream| {
628    ///     stream.set_keepalive(None).expect("set_keepalive failed");
629    /// });
630    /// # Ok(())
631    /// # }
632    /// ```
633    pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
634        self.io.get_ref().set_keepalive(keepalive)
635    }
636
637    /// Gets the value of the `IP_TTL` option for this socket.
638    ///
639    /// For more information about this option, see [`set_ttl`].
640    ///
641    /// [`set_ttl`]: #tymethod.set_ttl
642    ///
643    /// # Examples
644    ///
645    /// ```
646    /// # extern crate tokio;
647    /// # extern crate futures;
648    /// use tokio::net::TcpStream;
649    /// use futures::Future;
650    /// use std::net::SocketAddr;
651    ///
652    /// # fn main() -> Result<(), Box<std::error::Error>> {
653    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
654    /// let stream = TcpStream::connect(&addr);
655    /// stream.map(|stream| {
656    ///     stream.set_ttl(100).expect("set_ttl failed");
657    ///     assert_eq!(stream.ttl().unwrap_or(0), 100);
658    /// });
659    /// # Ok(())
660    /// # }
661    /// ```
662    pub fn ttl(&self) -> io::Result<u32> {
663        self.io.get_ref().ttl()
664    }
665
666    /// Sets the value for the `IP_TTL` option on this socket.
667    ///
668    /// This value sets the time-to-live field that is used in every packet sent
669    /// from this socket.
670    ///
671    /// # Examples
672    ///
673    /// ```
674    /// # extern crate tokio;
675    /// # extern crate futures;
676    /// use tokio::net::TcpStream;
677    /// use futures::Future;
678    /// use std::net::SocketAddr;
679    ///
680    /// # fn main() -> Result<(), Box<std::error::Error>> {
681    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
682    /// let stream = TcpStream::connect(&addr);
683    /// stream.map(|stream| {
684    ///     stream.set_ttl(100).expect("set_ttl failed");
685    /// });
686    /// # Ok(())
687    /// # }
688    /// ```
689    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
690        self.io.get_ref().set_ttl(ttl)
691    }
692
693    /// Reads the linger duration for this socket by getting the `SO_LINGER`
694    /// option.
695    ///
696    /// For more information about this option, see [`set_linger`].
697    ///
698    /// [`set_linger`]: #tymethod.set_linger
699    ///
700    /// # Examples
701    ///
702    /// ```
703    /// # extern crate tokio;
704    /// # extern crate futures;
705    /// use tokio::net::TcpStream;
706    /// use futures::Future;
707    /// use std::net::SocketAddr;
708    ///
709    /// # fn main() -> Result<(), Box<std::error::Error>> {
710    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
711    /// let stream = TcpStream::connect(&addr);
712    /// stream.map(|stream| {
713    ///     stream.set_linger(None).expect("set_linger failed");
714    ///     assert_eq!(stream.linger().unwrap(), None);
715    /// });
716    /// # Ok(())
717    /// # }
718    /// ```
719    pub fn linger(&self) -> io::Result<Option<Duration>> {
720        self.io.get_ref().linger()
721    }
722
723    /// Sets the linger duration of this socket by setting the `SO_LINGER`
724    /// option.
725    ///
726    /// This option controls the action taken when a stream has unsent messages
727    /// and the stream is closed. If `SO_LINGER` is set, the system
728    /// shall block the process  until it can transmit the data or until the
729    /// time expires.
730    ///
731    /// If `SO_LINGER` is not specified, and the stream is closed, the system
732    /// handles the call in a way that allows the process to continue as quickly
733    /// as possible.
734    ///
735    /// # Examples
736    ///
737    /// ```
738    /// # extern crate tokio;
739    /// # extern crate futures;
740    /// use tokio::net::TcpStream;
741    /// use futures::Future;
742    /// use std::net::SocketAddr;
743    ///
744    /// # fn main() -> Result<(), Box<std::error::Error>> {
745    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
746    /// let stream = TcpStream::connect(&addr);
747    /// stream.map(|stream| {
748    ///     stream.set_linger(None).expect("set_linger failed");
749    /// });
750    /// # Ok(())
751    /// # }
752    /// ```
753    pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
754        self.io.get_ref().set_linger(dur)
755    }
756
757    /// Creates a new independently owned handle to the underlying socket.
758    ///
759    /// The returned `TcpStream` is a reference to the same stream that this
760    /// object references. Both handles will read and write the same stream of
761    /// data, and options set on one stream will be propagated to the other
762    /// stream.
763    ///
764    /// # Examples
765    ///
766    /// ```
767    /// # extern crate tokio;
768    /// # extern crate futures;
769    /// use tokio::net::TcpStream;
770    /// use futures::Future;
771    /// use std::net::SocketAddr;
772    ///
773    /// # fn main() -> Result<(), Box<std::error::Error>> {
774    /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
775    /// let stream = TcpStream::connect(&addr);
776    /// stream.map(|stream| {
777    ///     let clone = stream.try_clone().unwrap();
778    /// });
779    /// # Ok(())
780    /// # }
781    /// ```
782    #[deprecated(since = "0.1.14", note = "use `split()` instead")]
783    #[doc(hidden)]
784    pub fn try_clone(&self) -> io::Result<TcpStream> {
785        // Rationale for deprecation:
786        // - https://github.com/tokio-rs/tokio/pull/824
787        // - https://github.com/tokio-rs/tokio/issues/774#issuecomment-451059317
788        let msg = "`TcpStream::try_clone()` is deprecated because it doesn't work as intended";
789        Err(io::Error::new(io::ErrorKind::Other, msg))
790    }
791}
792
793// ===== impl Read / Write =====
794
795impl Read for TcpStream {
796    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
797        self.io.read(buf)
798    }
799}
800
801impl Write for TcpStream {
802    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
803        self.io.write(buf)
804    }
805    fn flush(&mut self) -> io::Result<()> {
806        Ok(())
807    }
808}
809
810impl AsyncRead for TcpStream {
811    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
812        false
813    }
814
815    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
816        <&TcpStream>::read_buf(&mut &*self, buf)
817    }
818}
819
820impl AsyncWrite for TcpStream {
821    fn shutdown(&mut self) -> Poll<(), io::Error> {
822        <&TcpStream>::shutdown(&mut &*self)
823    }
824
825    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
826        <&TcpStream>::write_buf(&mut &*self, buf)
827    }
828}
829
830// ===== impl Read / Write for &'a =====
831
832impl<'a> Read for &'a TcpStream {
833    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
834        (&self.io).read(buf)
835    }
836}
837
838impl<'a> Write for &'a TcpStream {
839    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
840        (&self.io).write(buf)
841    }
842
843    fn flush(&mut self) -> io::Result<()> {
844        (&self.io).flush()
845    }
846}
847
848impl<'a> AsyncRead for &'a TcpStream {
849    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
850        false
851    }
852
853    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
854        if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? {
855            return Ok(Async::NotReady);
856        }
857
858        let r = unsafe {
859            // The `IoVec` type can't have a 0-length size, so we create a bunch
860            // of dummy versions on the stack with 1 length which we'll quickly
861            // overwrite.
862            let b1: &mut [u8] = &mut [0];
863            let b2: &mut [u8] = &mut [0];
864            let b3: &mut [u8] = &mut [0];
865            let b4: &mut [u8] = &mut [0];
866            let b5: &mut [u8] = &mut [0];
867            let b6: &mut [u8] = &mut [0];
868            let b7: &mut [u8] = &mut [0];
869            let b8: &mut [u8] = &mut [0];
870            let b9: &mut [u8] = &mut [0];
871            let b10: &mut [u8] = &mut [0];
872            let b11: &mut [u8] = &mut [0];
873            let b12: &mut [u8] = &mut [0];
874            let b13: &mut [u8] = &mut [0];
875            let b14: &mut [u8] = &mut [0];
876            let b15: &mut [u8] = &mut [0];
877            let b16: &mut [u8] = &mut [0];
878            let mut bufs: [&mut IoVec; 16] = [
879                b1.into(),
880                b2.into(),
881                b3.into(),
882                b4.into(),
883                b5.into(),
884                b6.into(),
885                b7.into(),
886                b8.into(),
887                b9.into(),
888                b10.into(),
889                b11.into(),
890                b12.into(),
891                b13.into(),
892                b14.into(),
893                b15.into(),
894                b16.into(),
895            ];
896            let n = buf.bytes_vec_mut(&mut bufs);
897            self.io.get_ref().read_bufs(&mut bufs[..n])
898        };
899
900        match r {
901            Ok(n) => {
902                unsafe {
903                    buf.advance_mut(n);
904                }
905                Ok(Async::Ready(n))
906            }
907            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
908                self.io.clear_read_ready(mio::Ready::readable())?;
909                Ok(Async::NotReady)
910            }
911            Err(e) => Err(e),
912        }
913    }
914}
915
916impl<'a> AsyncWrite for &'a TcpStream {
917    fn shutdown(&mut self) -> Poll<(), io::Error> {
918        Ok(().into())
919    }
920
921    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
922        if let Async::NotReady = self.io.poll_write_ready()? {
923            return Ok(Async::NotReady);
924        }
925
926        let r = {
927            // The `IoVec` type can't have a zero-length size, so create a dummy
928            // version from a 1-length slice which we'll overwrite with the
929            // `bytes_vec` method.
930            static DUMMY: &[u8] = &[0];
931            let iovec = <&IoVec>::from(DUMMY);
932            let mut bufs = [iovec; 64];
933            let n = buf.bytes_vec(&mut bufs);
934            self.io.get_ref().write_bufs(&bufs[..n])
935        };
936        match r {
937            Ok(n) => {
938                buf.advance(n);
939                Ok(Async::Ready(n))
940            }
941            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
942                self.io.clear_write_ready()?;
943                Ok(Async::NotReady)
944            }
945            Err(e) => Err(e),
946        }
947    }
948}
949
950impl fmt::Debug for TcpStream {
951    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
952        self.io.get_ref().fmt(f)
953    }
954}
955
956impl Future for ConnectFuture {
957    type Item = TcpStream;
958    type Error = io::Error;
959
960    fn poll(&mut self) -> Poll<TcpStream, io::Error> {
961        self.inner.poll()
962    }
963}
964
965impl ConnectFutureState {
966    fn poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error>
967    where
968        F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error>,
969    {
970        {
971            let stream = match *self {
972                ConnectFutureState::Waiting(ref mut s) => s,
973                ConnectFutureState::Error(_) => {
974                    let e = match mem::replace(self, ConnectFutureState::Empty) {
975                        ConnectFutureState::Error(e) => e,
976                        _ => panic!(),
977                    };
978                    return Err(e);
979                }
980                ConnectFutureState::Empty => panic!("can't poll TCP stream twice"),
981            };
982
983            // Once we've connected, wait for the stream to be writable as
984            // that's when the actual connection has been initiated. Once we're
985            // writable we check for `take_socket_error` to see if the connect
986            // actually hit an error or not.
987            //
988            // If all that succeeded then we ship everything on up.
989            if let Async::NotReady = f(&mut stream.io)? {
990                return Ok(Async::NotReady);
991            }
992
993            if let Some(e) = stream.io.get_ref().take_error()? {
994                return Err(e);
995            }
996        }
997
998        match mem::replace(self, ConnectFutureState::Empty) {
999            ConnectFutureState::Waiting(stream) => Ok(Async::Ready(stream)),
1000            _ => panic!(),
1001        }
1002    }
1003}
1004
1005impl Future for ConnectFutureState {
1006    type Item = TcpStream;
1007    type Error = io::Error;
1008
1009    fn poll(&mut self) -> Poll<TcpStream, io::Error> {
1010        self.poll_inner(|io| io.poll_write_ready())
1011    }
1012}
1013
1014#[cfg(unix)]
1015mod sys {
1016    use super::TcpStream;
1017    use std::os::unix::prelude::*;
1018
1019    impl AsRawFd for TcpStream {
1020        fn as_raw_fd(&self) -> RawFd {
1021            self.io.get_ref().as_raw_fd()
1022        }
1023    }
1024}
1025
1026#[cfg(windows)]
1027mod sys {
1028    // TODO: let's land these upstream with mio and then we can add them here.
1029    //
1030    // use std::os::windows::prelude::*;
1031    // use super::TcpStream;
1032    //
1033    // impl AsRawHandle for TcpStream {
1034    //     fn as_raw_handle(&self) -> RawHandle {
1035    //         self.io.get_ref().as_raw_handle()
1036    //     }
1037    // }
1038}