tokio_tcp/stream.rs
1use std::fmt;
2use std::io::{self, Read, Write};
3use std::mem;
4use std::net::{self, Shutdown, SocketAddr};
5use std::time::Duration;
6
7use bytes::{Buf, BufMut};
8use futures::{Async, Future, Poll};
9use iovec::IoVec;
10use mio;
11use tokio_io::{AsyncRead, AsyncWrite};
12use tokio_reactor::{Handle, PollEvented};
13
14/// An I/O object representing a TCP stream connected to a remote endpoint.
15///
16/// A TCP stream can either be created by connecting to an endpoint, via the
17/// [`connect`] method, or by [accepting] a connection from a [listener].
18///
19/// [`connect`]: struct.TcpStream.html#method.connect
20/// [accepting]: struct.TcpListener.html#method.accept
21/// [listener]: struct.TcpListener.html
22///
23/// # Examples
24///
25/// ```
26/// # extern crate tokio;
27/// # extern crate futures;
28/// use futures::Future;
29/// use tokio::io::AsyncWrite;
30/// use tokio::net::TcpStream;
31/// use std::net::SocketAddr;
32///
33/// # fn main() -> Result<(), Box<std::error::Error>> {
34/// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
35/// let stream = TcpStream::connect(&addr);
36/// stream.map(|mut stream| {
37/// // Attempt to write bytes asynchronously to the stream
38/// stream.poll_write(&[1]);
39/// });
40/// # Ok(())
41/// # }
42/// ```
43pub struct TcpStream {
44 io: PollEvented<mio::net::TcpStream>,
45}
46
47/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
48/// when the stream is connected.
49#[must_use = "futures do nothing unless polled"]
50#[derive(Debug)]
51pub struct ConnectFuture {
52 inner: ConnectFutureState,
53}
54
55#[must_use = "futures do nothing unless polled"]
56#[derive(Debug)]
57enum ConnectFutureState {
58 Waiting(TcpStream),
59 Error(io::Error),
60 Empty,
61}
62
63impl TcpStream {
64 /// Create a new TCP stream connected to the specified address.
65 ///
66 /// This function will create a new TCP socket and attempt to connect it to
67 /// the `addr` provided. The returned future will be resolved once the
68 /// stream has successfully connected, or it will return an error if one
69 /// occurs.
70 ///
71 /// # Examples
72 ///
73 /// ```
74 /// # extern crate tokio;
75 /// # extern crate futures;
76 /// use futures::Future;
77 /// use tokio::net::TcpStream;
78 /// use std::net::SocketAddr;
79 ///
80 /// # fn main() -> Result<(), Box<std::error::Error>> {
81 /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
82 /// let stream = TcpStream::connect(&addr)
83 /// .map(|stream|
84 /// println!("successfully connected to {}", stream.local_addr().unwrap()));
85 /// # Ok(())
86 /// # }
87 /// ```
88 pub fn connect(addr: &SocketAddr) -> ConnectFuture {
89 use self::ConnectFutureState::*;
90
91 let inner = match mio::net::TcpStream::connect(addr) {
92 Ok(tcp) => Waiting(TcpStream::new(tcp)),
93 Err(e) => Error(e),
94 };
95
96 ConnectFuture { inner }
97 }
98
99 pub(crate) fn new(connected: mio::net::TcpStream) -> TcpStream {
100 let io = PollEvented::new(connected);
101 TcpStream { io }
102 }
103
104 /// Create a new `TcpStream` from a `net::TcpStream`.
105 ///
106 /// This function will convert a TCP stream created by the standard library
107 /// to a TCP stream ready to be used with the provided event loop handle.
108 /// Use `Handle::default()` to lazily bind to an event loop, just like `connect` does.
109 ///
110 /// # Examples
111 ///
112 /// ```no_run
113 /// # extern crate tokio;
114 /// # extern crate tokio_reactor;
115 /// use tokio::net::TcpStream;
116 /// use std::net::TcpStream as StdTcpStream;
117 /// use tokio_reactor::Handle;
118 ///
119 /// # fn main() -> Result<(), Box<std::error::Error>> {
120 /// let std_stream = StdTcpStream::connect("127.0.0.1:34254")?;
121 /// let stream = TcpStream::from_std(std_stream, &Handle::default())?;
122 /// # Ok(())
123 /// # }
124 /// ```
125 pub fn from_std(stream: net::TcpStream, handle: &Handle) -> io::Result<TcpStream> {
126 let io = mio::net::TcpStream::from_stream(stream)?;
127 let io = PollEvented::new_with_handle(io, handle)?;
128
129 Ok(TcpStream { io })
130 }
131
132 /// Creates a new `TcpStream` from the pending socket inside the given
133 /// `std::net::TcpStream`, connecting it to the address specified.
134 ///
135 /// This constructor allows configuring the socket before it's actually
136 /// connected, and this function will transfer ownership to the returned
137 /// `TcpStream` if successful. An unconnected `TcpStream` can be created
138 /// with the `net2::TcpBuilder` type (and also configured via that route).
139 ///
140 /// The platform specific behavior of this function looks like:
141 ///
142 /// * On Unix, the socket is placed into nonblocking mode and then a
143 /// `connect` call is issued.
144 ///
145 /// * On Windows, the address is stored internally and the connect operation
146 /// is issued when the returned `TcpStream` is registered with an event
147 /// loop. Note that on Windows you must `bind` a socket before it can be
148 /// connected, so if a custom `TcpBuilder` is used it should be bound
149 /// (perhaps to `INADDR_ANY`) before this method is called.
150 pub fn connect_std(
151 stream: net::TcpStream,
152 addr: &SocketAddr,
153 handle: &Handle,
154 ) -> ConnectFuture {
155 use self::ConnectFutureState::*;
156
157 let io = mio::net::TcpStream::connect_stream(stream, addr)
158 .and_then(|io| PollEvented::new_with_handle(io, handle));
159
160 let inner = match io {
161 Ok(io) => Waiting(TcpStream { io }),
162 Err(e) => Error(e),
163 };
164
165 ConnectFuture { inner: inner }
166 }
167
168 /// Check the TCP stream's read readiness state.
169 ///
170 /// The mask argument allows specifying what readiness to notify on. This
171 /// can be any value, including platform specific readiness, **except**
172 /// `writable`. HUP is always implicitly included on platforms that support
173 /// it.
174 ///
175 /// If the resource is not ready for a read then `Async::NotReady` is
176 /// returned and the current task is notified once a new event is received.
177 ///
178 /// The stream will remain in a read-ready state until calls to `poll_read`
179 /// return `NotReady`.
180 ///
181 /// # Panics
182 ///
183 /// This function panics if:
184 ///
185 /// * `ready` includes writable.
186 /// * called from outside of a task context.
187 ///
188 /// # Examples
189 ///
190 /// ```
191 /// # extern crate mio;
192 /// # extern crate tokio;
193 /// # extern crate futures;
194 /// use mio::Ready;
195 /// use futures::Async;
196 /// use futures::Future;
197 /// use tokio::net::TcpStream;
198 /// use std::net::SocketAddr;
199 ///
200 /// # fn main() -> Result<(), Box<std::error::Error>> {
201 /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
202 /// let stream = TcpStream::connect(&addr);
203 /// stream.map(|stream| {
204 /// match stream.poll_read_ready(Ready::readable()) {
205 /// Ok(Async::Ready(_)) => println!("read ready"),
206 /// Ok(Async::NotReady) => println!("not read ready"),
207 /// Err(e) => eprintln!("got error: {}", e),
208 /// }
209 /// });
210 /// # Ok(())
211 /// # }
212 /// ```
213 pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
214 self.io.poll_read_ready(mask)
215 }
216
217 /// Check the TCP stream's write readiness state.
218 ///
219 /// This always checks for writable readiness and also checks for HUP
220 /// readiness on platforms that support it.
221 ///
222 /// If the resource is not ready for a write then `Async::NotReady` is
223 /// returned and the current task is notified once a new event is received.
224 ///
225 /// The I/O resource will remain in a write-ready state until calls to
226 /// `poll_write` return `NotReady`.
227 ///
228 /// # Panics
229 ///
230 /// This function panics if called from outside of a task context.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// # extern crate tokio;
236 /// # extern crate futures;
237 /// use futures::Async;
238 /// use futures::Future;
239 /// use tokio::net::TcpStream;
240 /// use std::net::SocketAddr;
241 ///
242 /// # fn main() -> Result<(), Box<std::error::Error>> {
243 /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
244 /// let stream = TcpStream::connect(&addr);
245 /// stream.map(|stream| {
246 /// match stream.poll_write_ready() {
247 /// Ok(Async::Ready(_)) => println!("write ready"),
248 /// Ok(Async::NotReady) => println!("not write ready"),
249 /// Err(e) => eprintln!("got error: {}", e),
250 /// }
251 /// });
252 /// # Ok(())
253 /// # }
254 /// ```
255 pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
256 self.io.poll_write_ready()
257 }
258
259 /// Returns the local address that this stream is bound to.
260 ///
261 /// # Examples
262 ///
263 /// ```
264 /// # extern crate tokio;
265 /// # extern crate futures;
266 /// use tokio::net::TcpStream;
267 /// use futures::Future;
268 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
269 ///
270 /// # fn main() -> Result<(), Box<std::error::Error>> {
271 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
272 /// let stream = TcpStream::connect(&addr);
273 /// stream.map(|stream| {
274 /// assert_eq!(stream.local_addr().unwrap(),
275 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)));
276 /// });
277 /// # Ok(())
278 /// # }
279 /// ```
280 pub fn local_addr(&self) -> io::Result<SocketAddr> {
281 self.io.get_ref().local_addr()
282 }
283
284 /// Returns the remote address that this stream is connected to.
285 /// # Examples
286 ///
287 /// ```
288 /// # extern crate tokio;
289 /// # extern crate futures;
290 /// use tokio::net::TcpStream;
291 /// use futures::Future;
292 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
293 ///
294 /// # fn main() -> Result<(), Box<std::error::Error>> {
295 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
296 /// let stream = TcpStream::connect(&addr);
297 /// stream.map(|stream| {
298 /// assert_eq!(stream.peer_addr().unwrap(),
299 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)));
300 /// });
301 /// # Ok(())
302 /// # }
303 /// ```
304 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
305 self.io.get_ref().peer_addr()
306 }
307
308 #[deprecated(since = "0.1.2", note = "use poll_peek instead")]
309 #[doc(hidden)]
310 pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
311 match self.poll_peek(buf)? {
312 Async::Ready(n) => Ok(n),
313 Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
314 }
315 }
316
317 /// Receives data on the socket from the remote address to which it is
318 /// connected, without removing that data from the queue. On success,
319 /// returns the number of bytes peeked.
320 ///
321 /// Successive calls return the same data. This is accomplished by passing
322 /// `MSG_PEEK` as a flag to the underlying recv system call.
323 ///
324 /// # Return
325 ///
326 /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
327 ///
328 /// If no data is available for reading, the method returns
329 /// `Ok(Async::NotReady)` and arranges for the current task to receive a
330 /// notification when the socket becomes readable or is closed.
331 ///
332 /// # Panics
333 ///
334 /// This function will panic if called from outside of a task context.
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// # extern crate tokio;
340 /// # extern crate futures;
341 /// use tokio::net::TcpStream;
342 /// use futures::Async;
343 /// use futures::Future;
344 /// use std::net::SocketAddr;
345 ///
346 /// # fn main() -> Result<(), Box<std::error::Error>> {
347 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
348 /// let stream = TcpStream::connect(&addr);
349 /// stream.map(|mut stream| {
350 /// let mut buf = [0; 10];
351 /// match stream.poll_peek(&mut buf) {
352 /// Ok(Async::Ready(len)) => println!("read {} bytes", len),
353 /// Ok(Async::NotReady) => println!("no data available"),
354 /// Err(e) => eprintln!("got error: {}", e),
355 /// }
356 /// });
357 /// # Ok(())
358 /// # }
359 /// ```
360 pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
361 try_ready!(self.io.poll_read_ready(mio::Ready::readable()));
362
363 match self.io.get_ref().peek(buf) {
364 Ok(ret) => Ok(ret.into()),
365 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
366 self.io.clear_read_ready(mio::Ready::readable())?;
367 Ok(Async::NotReady)
368 }
369 Err(e) => Err(e),
370 }
371 }
372
373 /// Shuts down the read, write, or both halves of this connection.
374 ///
375 /// This function will cause all pending and future I/O on the specified
376 /// portions to return immediately with an appropriate value (see the
377 /// documentation of `Shutdown`).
378 ///
379 /// # Examples
380 ///
381 /// ```
382 /// # extern crate tokio;
383 /// # extern crate futures;
384 /// use tokio::net::TcpStream;
385 /// use futures::Future;
386 /// use std::net::{Shutdown, SocketAddr};
387 ///
388 /// # fn main() -> Result<(), Box<std::error::Error>> {
389 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
390 /// let stream = TcpStream::connect(&addr);
391 /// stream.map(|stream| {
392 /// stream.shutdown(Shutdown::Both)
393 /// });
394 /// # Ok(())
395 /// # }
396 /// ```
397 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
398 self.io.get_ref().shutdown(how)
399 }
400
401 /// Gets the value of the `TCP_NODELAY` option on this socket.
402 ///
403 /// For more information about this option, see [`set_nodelay`].
404 ///
405 /// [`set_nodelay`]: #method.set_nodelay
406 ///
407 /// # Examples
408 ///
409 /// ```
410 /// # extern crate tokio;
411 /// # extern crate futures;
412 /// use tokio::net::TcpStream;
413 /// use futures::Future;
414 /// use std::net::SocketAddr;
415 ///
416 /// # fn main() -> Result<(), Box<std::error::Error>> {
417 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
418 /// let stream = TcpStream::connect(&addr);
419 /// stream.map(|stream| {
420 /// stream.set_nodelay(true).expect("set_nodelay call failed");;
421 /// assert_eq!(stream.nodelay().unwrap_or(false), true);
422 /// });
423 /// # Ok(())
424 /// # }
425 /// ```
426 pub fn nodelay(&self) -> io::Result<bool> {
427 self.io.get_ref().nodelay()
428 }
429
430 /// Sets the value of the `TCP_NODELAY` option on this socket.
431 ///
432 /// If set, this option disables the Nagle algorithm. This means that
433 /// segments are always sent as soon as possible, even if there is only a
434 /// small amount of data. When not set, data is buffered until there is a
435 /// sufficient amount to send out, thereby avoiding the frequent sending of
436 /// small packets.
437 ///
438 /// # Examples
439 ///
440 /// ```
441 /// # extern crate tokio;
442 /// # extern crate futures;
443 /// use tokio::net::TcpStream;
444 /// use futures::Future;
445 /// use std::net::SocketAddr;
446 ///
447 /// # fn main() -> Result<(), Box<std::error::Error>> {
448 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
449 /// let stream = TcpStream::connect(&addr);
450 /// stream.map(|stream| {
451 /// stream.set_nodelay(true).expect("set_nodelay call failed");
452 /// });
453 /// # Ok(())
454 /// # }
455 /// ```
456 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
457 self.io.get_ref().set_nodelay(nodelay)
458 }
459
460 /// Gets the value of the `SO_RCVBUF` option on this socket.
461 ///
462 /// For more information about this option, see [`set_recv_buffer_size`].
463 ///
464 /// [`set_recv_buffer_size`]: #tymethod.set_recv_buffer_size
465 ///
466 /// # Examples
467 ///
468 /// ```
469 /// # extern crate tokio;
470 /// # extern crate futures;
471 /// use tokio::net::TcpStream;
472 /// use futures::Future;
473 /// use std::net::SocketAddr;
474 ///
475 /// # fn main() -> Result<(), Box<std::error::Error>> {
476 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
477 /// let stream = TcpStream::connect(&addr);
478 /// stream.map(|stream| {
479 /// stream.set_recv_buffer_size(100).expect("set_recv_buffer_size failed");
480 /// assert_eq!(stream.recv_buffer_size().unwrap_or(0), 100);
481 /// });
482 /// # Ok(())
483 /// # }
484 /// ```
485 pub fn recv_buffer_size(&self) -> io::Result<usize> {
486 self.io.get_ref().recv_buffer_size()
487 }
488
489 /// Sets the value of the `SO_RCVBUF` option on this socket.
490 ///
491 /// Changes the size of the operating system's receive buffer associated
492 /// with the socket.
493 ///
494 /// # Examples
495 ///
496 /// ```
497 /// # extern crate tokio;
498 /// # extern crate futures;
499 /// use tokio::net::TcpStream;
500 /// use futures::Future;
501 /// use std::net::SocketAddr;
502 ///
503 /// # fn main() -> Result<(), Box<std::error::Error>> {
504 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
505 /// let stream = TcpStream::connect(&addr);
506 /// stream.map(|stream| {
507 /// stream.set_recv_buffer_size(100).expect("set_recv_buffer_size failed");
508 /// });
509 /// # Ok(())
510 /// # }
511 /// ```
512 pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
513 self.io.get_ref().set_recv_buffer_size(size)
514 }
515
516 /// Gets the value of the `SO_SNDBUF` option on this socket.
517 ///
518 /// For more information about this option, see [`set_send_buffer`].
519 ///
520 /// [`set_send_buffer`]: #tymethod.set_send_buffer
521 ///
522 /// # Examples
523 ///
524 /// ```
525 /// # extern crate tokio;
526 /// # extern crate futures;
527 /// use tokio::net::TcpStream;
528 /// use futures::Future;
529 /// use std::net::SocketAddr;
530 ///
531 /// # fn main() -> Result<(), Box<std::error::Error>> {
532 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
533 /// let stream = TcpStream::connect(&addr);
534 /// stream.map(|stream| {
535 /// stream.set_send_buffer_size(100).expect("set_send_buffer_size failed");
536 /// assert_eq!(stream.send_buffer_size().unwrap_or(0), 100);
537 /// });
538 /// # Ok(())
539 /// # }
540 /// ```
541 pub fn send_buffer_size(&self) -> io::Result<usize> {
542 self.io.get_ref().send_buffer_size()
543 }
544
545 /// Sets the value of the `SO_SNDBUF` option on this socket.
546 ///
547 /// Changes the size of the operating system's send buffer associated with
548 /// the socket.
549 ///
550 /// # Examples
551 ///
552 /// ```
553 /// # extern crate tokio;
554 /// # extern crate futures;
555 /// use tokio::net::TcpStream;
556 /// use futures::Future;
557 /// use std::net::SocketAddr;
558 ///
559 /// # fn main() -> Result<(), Box<std::error::Error>> {
560 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
561 /// let stream = TcpStream::connect(&addr);
562 /// stream.map(|stream| {
563 /// stream.set_send_buffer_size(100).expect("set_send_buffer_size failed");
564 /// });
565 /// # Ok(())
566 /// # }
567 /// ```
568 pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
569 self.io.get_ref().set_send_buffer_size(size)
570 }
571
572 /// Returns whether keepalive messages are enabled on this socket, and if so
573 /// the duration of time between them.
574 ///
575 /// For more information about this option, see [`set_keepalive`].
576 ///
577 /// [`set_keepalive`]: #tymethod.set_keepalive
578 ///
579 /// # Examples
580 ///
581 /// ```
582 /// # extern crate tokio;
583 /// # extern crate futures;
584 /// use tokio::net::TcpStream;
585 /// use futures::Future;
586 /// use std::net::SocketAddr;
587 ///
588 /// # fn main() -> Result<(), Box<std::error::Error>> {
589 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
590 /// let stream = TcpStream::connect(&addr);
591 /// stream.map(|stream| {
592 /// stream.set_keepalive(None).expect("set_keepalive failed");
593 /// assert_eq!(stream.keepalive().unwrap(), None);
594 /// });
595 /// # Ok(())
596 /// # }
597 /// ```
598 pub fn keepalive(&self) -> io::Result<Option<Duration>> {
599 self.io.get_ref().keepalive()
600 }
601
602 /// Sets whether keepalive messages are enabled to be sent on this socket.
603 ///
604 /// On Unix, this option will set the `SO_KEEPALIVE` as well as the
605 /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform).
606 /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option.
607 ///
608 /// If `None` is specified then keepalive messages are disabled, otherwise
609 /// the duration specified will be the time to remain idle before sending a
610 /// TCP keepalive probe.
611 ///
612 /// Some platforms specify this value in seconds, so sub-second
613 /// specifications may be omitted.
614 ///
615 /// # Examples
616 ///
617 /// ```
618 /// # extern crate tokio;
619 /// # extern crate futures;
620 /// use tokio::net::TcpStream;
621 /// use futures::Future;
622 /// use std::net::SocketAddr;
623 ///
624 /// # fn main() -> Result<(), Box<std::error::Error>> {
625 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
626 /// let stream = TcpStream::connect(&addr);
627 /// stream.map(|stream| {
628 /// stream.set_keepalive(None).expect("set_keepalive failed");
629 /// });
630 /// # Ok(())
631 /// # }
632 /// ```
633 pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
634 self.io.get_ref().set_keepalive(keepalive)
635 }
636
637 /// Gets the value of the `IP_TTL` option for this socket.
638 ///
639 /// For more information about this option, see [`set_ttl`].
640 ///
641 /// [`set_ttl`]: #tymethod.set_ttl
642 ///
643 /// # Examples
644 ///
645 /// ```
646 /// # extern crate tokio;
647 /// # extern crate futures;
648 /// use tokio::net::TcpStream;
649 /// use futures::Future;
650 /// use std::net::SocketAddr;
651 ///
652 /// # fn main() -> Result<(), Box<std::error::Error>> {
653 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
654 /// let stream = TcpStream::connect(&addr);
655 /// stream.map(|stream| {
656 /// stream.set_ttl(100).expect("set_ttl failed");
657 /// assert_eq!(stream.ttl().unwrap_or(0), 100);
658 /// });
659 /// # Ok(())
660 /// # }
661 /// ```
662 pub fn ttl(&self) -> io::Result<u32> {
663 self.io.get_ref().ttl()
664 }
665
666 /// Sets the value for the `IP_TTL` option on this socket.
667 ///
668 /// This value sets the time-to-live field that is used in every packet sent
669 /// from this socket.
670 ///
671 /// # Examples
672 ///
673 /// ```
674 /// # extern crate tokio;
675 /// # extern crate futures;
676 /// use tokio::net::TcpStream;
677 /// use futures::Future;
678 /// use std::net::SocketAddr;
679 ///
680 /// # fn main() -> Result<(), Box<std::error::Error>> {
681 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
682 /// let stream = TcpStream::connect(&addr);
683 /// stream.map(|stream| {
684 /// stream.set_ttl(100).expect("set_ttl failed");
685 /// });
686 /// # Ok(())
687 /// # }
688 /// ```
689 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
690 self.io.get_ref().set_ttl(ttl)
691 }
692
693 /// Reads the linger duration for this socket by getting the `SO_LINGER`
694 /// option.
695 ///
696 /// For more information about this option, see [`set_linger`].
697 ///
698 /// [`set_linger`]: #tymethod.set_linger
699 ///
700 /// # Examples
701 ///
702 /// ```
703 /// # extern crate tokio;
704 /// # extern crate futures;
705 /// use tokio::net::TcpStream;
706 /// use futures::Future;
707 /// use std::net::SocketAddr;
708 ///
709 /// # fn main() -> Result<(), Box<std::error::Error>> {
710 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
711 /// let stream = TcpStream::connect(&addr);
712 /// stream.map(|stream| {
713 /// stream.set_linger(None).expect("set_linger failed");
714 /// assert_eq!(stream.linger().unwrap(), None);
715 /// });
716 /// # Ok(())
717 /// # }
718 /// ```
719 pub fn linger(&self) -> io::Result<Option<Duration>> {
720 self.io.get_ref().linger()
721 }
722
723 /// Sets the linger duration of this socket by setting the `SO_LINGER`
724 /// option.
725 ///
726 /// This option controls the action taken when a stream has unsent messages
727 /// and the stream is closed. If `SO_LINGER` is set, the system
728 /// shall block the process until it can transmit the data or until the
729 /// time expires.
730 ///
731 /// If `SO_LINGER` is not specified, and the stream is closed, the system
732 /// handles the call in a way that allows the process to continue as quickly
733 /// as possible.
734 ///
735 /// # Examples
736 ///
737 /// ```
738 /// # extern crate tokio;
739 /// # extern crate futures;
740 /// use tokio::net::TcpStream;
741 /// use futures::Future;
742 /// use std::net::SocketAddr;
743 ///
744 /// # fn main() -> Result<(), Box<std::error::Error>> {
745 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
746 /// let stream = TcpStream::connect(&addr);
747 /// stream.map(|stream| {
748 /// stream.set_linger(None).expect("set_linger failed");
749 /// });
750 /// # Ok(())
751 /// # }
752 /// ```
753 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
754 self.io.get_ref().set_linger(dur)
755 }
756
757 /// Creates a new independently owned handle to the underlying socket.
758 ///
759 /// The returned `TcpStream` is a reference to the same stream that this
760 /// object references. Both handles will read and write the same stream of
761 /// data, and options set on one stream will be propagated to the other
762 /// stream.
763 ///
764 /// # Examples
765 ///
766 /// ```
767 /// # extern crate tokio;
768 /// # extern crate futures;
769 /// use tokio::net::TcpStream;
770 /// use futures::Future;
771 /// use std::net::SocketAddr;
772 ///
773 /// # fn main() -> Result<(), Box<std::error::Error>> {
774 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
775 /// let stream = TcpStream::connect(&addr);
776 /// stream.map(|stream| {
777 /// let clone = stream.try_clone().unwrap();
778 /// });
779 /// # Ok(())
780 /// # }
781 /// ```
782 #[deprecated(since = "0.1.14", note = "use `split()` instead")]
783 #[doc(hidden)]
784 pub fn try_clone(&self) -> io::Result<TcpStream> {
785 // Rationale for deprecation:
786 // - https://github.com/tokio-rs/tokio/pull/824
787 // - https://github.com/tokio-rs/tokio/issues/774#issuecomment-451059317
788 let msg = "`TcpStream::try_clone()` is deprecated because it doesn't work as intended";
789 Err(io::Error::new(io::ErrorKind::Other, msg))
790 }
791}
792
793// ===== impl Read / Write =====
794
795impl Read for TcpStream {
796 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
797 self.io.read(buf)
798 }
799}
800
801impl Write for TcpStream {
802 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
803 self.io.write(buf)
804 }
805 fn flush(&mut self) -> io::Result<()> {
806 Ok(())
807 }
808}
809
810impl AsyncRead for TcpStream {
811 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
812 false
813 }
814
815 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
816 <&TcpStream>::read_buf(&mut &*self, buf)
817 }
818}
819
820impl AsyncWrite for TcpStream {
821 fn shutdown(&mut self) -> Poll<(), io::Error> {
822 <&TcpStream>::shutdown(&mut &*self)
823 }
824
825 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
826 <&TcpStream>::write_buf(&mut &*self, buf)
827 }
828}
829
830// ===== impl Read / Write for &'a =====
831
832impl<'a> Read for &'a TcpStream {
833 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
834 (&self.io).read(buf)
835 }
836}
837
838impl<'a> Write for &'a TcpStream {
839 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
840 (&self.io).write(buf)
841 }
842
843 fn flush(&mut self) -> io::Result<()> {
844 (&self.io).flush()
845 }
846}
847
848impl<'a> AsyncRead for &'a TcpStream {
849 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
850 false
851 }
852
853 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
854 if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? {
855 return Ok(Async::NotReady);
856 }
857
858 let r = unsafe {
859 // The `IoVec` type can't have a 0-length size, so we create a bunch
860 // of dummy versions on the stack with 1 length which we'll quickly
861 // overwrite.
862 let b1: &mut [u8] = &mut [0];
863 let b2: &mut [u8] = &mut [0];
864 let b3: &mut [u8] = &mut [0];
865 let b4: &mut [u8] = &mut [0];
866 let b5: &mut [u8] = &mut [0];
867 let b6: &mut [u8] = &mut [0];
868 let b7: &mut [u8] = &mut [0];
869 let b8: &mut [u8] = &mut [0];
870 let b9: &mut [u8] = &mut [0];
871 let b10: &mut [u8] = &mut [0];
872 let b11: &mut [u8] = &mut [0];
873 let b12: &mut [u8] = &mut [0];
874 let b13: &mut [u8] = &mut [0];
875 let b14: &mut [u8] = &mut [0];
876 let b15: &mut [u8] = &mut [0];
877 let b16: &mut [u8] = &mut [0];
878 let mut bufs: [&mut IoVec; 16] = [
879 b1.into(),
880 b2.into(),
881 b3.into(),
882 b4.into(),
883 b5.into(),
884 b6.into(),
885 b7.into(),
886 b8.into(),
887 b9.into(),
888 b10.into(),
889 b11.into(),
890 b12.into(),
891 b13.into(),
892 b14.into(),
893 b15.into(),
894 b16.into(),
895 ];
896 let n = buf.bytes_vec_mut(&mut bufs);
897 self.io.get_ref().read_bufs(&mut bufs[..n])
898 };
899
900 match r {
901 Ok(n) => {
902 unsafe {
903 buf.advance_mut(n);
904 }
905 Ok(Async::Ready(n))
906 }
907 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
908 self.io.clear_read_ready(mio::Ready::readable())?;
909 Ok(Async::NotReady)
910 }
911 Err(e) => Err(e),
912 }
913 }
914}
915
916impl<'a> AsyncWrite for &'a TcpStream {
917 fn shutdown(&mut self) -> Poll<(), io::Error> {
918 Ok(().into())
919 }
920
921 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
922 if let Async::NotReady = self.io.poll_write_ready()? {
923 return Ok(Async::NotReady);
924 }
925
926 let r = {
927 // The `IoVec` type can't have a zero-length size, so create a dummy
928 // version from a 1-length slice which we'll overwrite with the
929 // `bytes_vec` method.
930 static DUMMY: &[u8] = &[0];
931 let iovec = <&IoVec>::from(DUMMY);
932 let mut bufs = [iovec; 64];
933 let n = buf.bytes_vec(&mut bufs);
934 self.io.get_ref().write_bufs(&bufs[..n])
935 };
936 match r {
937 Ok(n) => {
938 buf.advance(n);
939 Ok(Async::Ready(n))
940 }
941 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
942 self.io.clear_write_ready()?;
943 Ok(Async::NotReady)
944 }
945 Err(e) => Err(e),
946 }
947 }
948}
949
950impl fmt::Debug for TcpStream {
951 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
952 self.io.get_ref().fmt(f)
953 }
954}
955
956impl Future for ConnectFuture {
957 type Item = TcpStream;
958 type Error = io::Error;
959
960 fn poll(&mut self) -> Poll<TcpStream, io::Error> {
961 self.inner.poll()
962 }
963}
964
965impl ConnectFutureState {
966 fn poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error>
967 where
968 F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error>,
969 {
970 {
971 let stream = match *self {
972 ConnectFutureState::Waiting(ref mut s) => s,
973 ConnectFutureState::Error(_) => {
974 let e = match mem::replace(self, ConnectFutureState::Empty) {
975 ConnectFutureState::Error(e) => e,
976 _ => panic!(),
977 };
978 return Err(e);
979 }
980 ConnectFutureState::Empty => panic!("can't poll TCP stream twice"),
981 };
982
983 // Once we've connected, wait for the stream to be writable as
984 // that's when the actual connection has been initiated. Once we're
985 // writable we check for `take_socket_error` to see if the connect
986 // actually hit an error or not.
987 //
988 // If all that succeeded then we ship everything on up.
989 if let Async::NotReady = f(&mut stream.io)? {
990 return Ok(Async::NotReady);
991 }
992
993 if let Some(e) = stream.io.get_ref().take_error()? {
994 return Err(e);
995 }
996 }
997
998 match mem::replace(self, ConnectFutureState::Empty) {
999 ConnectFutureState::Waiting(stream) => Ok(Async::Ready(stream)),
1000 _ => panic!(),
1001 }
1002 }
1003}
1004
1005impl Future for ConnectFutureState {
1006 type Item = TcpStream;
1007 type Error = io::Error;
1008
1009 fn poll(&mut self) -> Poll<TcpStream, io::Error> {
1010 self.poll_inner(|io| io.poll_write_ready())
1011 }
1012}
1013
1014#[cfg(unix)]
1015mod sys {
1016 use super::TcpStream;
1017 use std::os::unix::prelude::*;
1018
1019 impl AsRawFd for TcpStream {
1020 fn as_raw_fd(&self) -> RawFd {
1021 self.io.get_ref().as_raw_fd()
1022 }
1023 }
1024}
1025
1026#[cfg(windows)]
1027mod sys {
1028 // TODO: let's land these upstream with mio and then we can add them here.
1029 //
1030 // use std::os::windows::prelude::*;
1031 // use super::TcpStream;
1032 //
1033 // impl AsRawHandle for TcpStream {
1034 // fn as_raw_handle(&self) -> RawHandle {
1035 // self.io.get_ref().as_raw_handle()
1036 // }
1037 // }
1038}