tokio_core/net/
tcp.rs

1use std::fmt;
2use std::io::{self, Read, Write};
3use std::mem;
4use std::net::{self, SocketAddr, Shutdown};
5use std::time::Duration;
6
7use bytes::{Buf, BufMut};
8use futures::stream::Stream;
9use futures::{Future, Poll, Async};
10use iovec::IoVec;
11use mio;
12use tokio_io::{AsyncRead, AsyncWrite};
13
14use reactor::{Handle, PollEvented2};
15
16/// An I/O object representing a TCP socket listening for incoming connections.
17///
18/// This object can be converted into a stream of incoming connections for
19/// various forms of processing.
20pub struct TcpListener {
21    io: PollEvented2<mio::net::TcpListener>,
22}
23
24/// Stream returned by the `TcpListener::incoming` function representing the
25/// stream of sockets received from a listener.
26#[must_use = "streams do nothing unless polled"]
27pub struct Incoming {
28    inner: TcpListener,
29}
30
31impl TcpListener {
32    /// Create a new TCP listener associated with this event loop.
33    ///
34    /// The TCP listener will bind to the provided `addr` address, if available.
35    /// If the result is `Ok`, the socket has successfully bound.
36    pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
37        let l = try!(mio::net::TcpListener::bind(addr));
38        TcpListener::new(l, handle)
39    }
40
41    /// Create a new TCP listener associated with this event loop.
42    ///
43    /// This is the same as `bind` but uses the default reactor instead of an
44    /// explicit `&Handle`.
45    pub fn bind2(addr: &SocketAddr) -> io::Result<TcpListener> {
46        let l = try!(mio::net::TcpListener::bind(addr));
47        TcpListener::new2(l)
48    }
49
50    /// Attempt to accept a connection and create a new connected `TcpStream` if
51    /// successful.
52    ///
53    /// This function will attempt an accept operation, but will not block
54    /// waiting for it to complete. If the operation would block then a "would
55    /// block" error is returned. Additionally, if this method would block, it
56    /// registers the current task to receive a notification when it would
57    /// otherwise not block.
58    ///
59    /// Note that typically for simple usage it's easier to treat incoming
60    /// connections as a `Stream` of `TcpStream`s with the `incoming` method
61    /// below.
62    ///
63    /// # Panics
64    ///
65    /// This function will panic if it is called outside the context of a
66    /// future's task. It's recommended to only call this from the
67    /// implementation of a `Future::poll`, if necessary.
68    pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
69        let (io, addr) = self.accept_std()?;
70
71        let io = mio::net::TcpStream::from_stream(io)?;
72        let io = PollEvented2::new(io);
73        let io = TcpStream { io };
74
75        Ok((io, addr))
76    }
77
78    /// Like `accept`, except that it returns a raw `std::net::TcpStream`.
79    ///
80    /// The stream is *in blocking mode*, and is not associated with the Tokio
81    /// event loop.
82    pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> {
83        if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? {
84            return Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready"))
85        }
86
87        match self.io.get_ref().accept_std() {
88            Err(e) => {
89                if e.kind() == io::ErrorKind::WouldBlock {
90                    self.io.clear_read_ready(mio::Ready::readable())?;
91                }
92                Err(e)
93            },
94            Ok((sock, addr)) => Ok((sock, addr)),
95        }
96    }
97
98    /// Create a new TCP listener from the standard library's TCP listener.
99    ///
100    /// This method can be used when the `Handle::tcp_listen` method isn't
101    /// sufficient because perhaps some more configuration is needed in terms of
102    /// before the calls to `bind` and `listen`.
103    ///
104    /// This API is typically paired with the `net2` crate and the `TcpBuilder`
105    /// type to build up and customize a listener before it's shipped off to the
106    /// backing event loop. This allows configuration of options like
107    /// `SO_REUSEPORT`, binding to multiple addresses, etc.
108    ///
109    /// The `addr` argument here is one of the addresses that `listener` is
110    /// bound to and the listener will only be guaranteed to accept connections
111    /// of the same address type currently.
112    ///
113    /// Finally, the `handle` argument is the event loop that this listener will
114    /// be bound to.
115    ///
116    /// The platform specific behavior of this function looks like:
117    ///
118    /// * On Unix, the socket is placed into nonblocking mode and connections
119    ///   can be accepted as normal
120    ///
121    /// * On Windows, the address is stored internally and all future accepts
122    ///   will only be for the same IP version as `addr` specified. That is, if
123    ///   `addr` is an IPv4 address then all sockets accepted will be IPv4 as
124    ///   well (same for IPv6).
125    pub fn from_listener(listener: net::TcpListener,
126                         _addr: &SocketAddr,
127                         handle: &Handle) -> io::Result<TcpListener> {
128        let l = try!(mio::net::TcpListener::from_std(listener));
129        TcpListener::new(l, handle)
130    }
131
132    fn new(listener: mio::net::TcpListener, handle: &Handle)
133           -> io::Result<TcpListener> {
134        let io = try!(PollEvented2::new_with_handle(listener, handle.new_tokio_handle()));
135        Ok(TcpListener { io: io })
136    }
137
138    fn new2(listener: mio::net::TcpListener)
139           -> io::Result<TcpListener> {
140        let io = PollEvented2::new(listener);
141        Ok(TcpListener { io: io })
142    }
143
144    /// Test whether this socket is ready to be read or not.
145    pub fn poll_read(&self) -> Async<()> {
146        self.io.poll_read_ready(mio::Ready::readable())
147            .map(|r| {
148                if r.is_ready() {
149                    Async::Ready(())
150                } else {
151                    Async::NotReady
152                }
153            })
154            .unwrap_or(().into())
155    }
156
157    /// Returns the local address that this listener is bound to.
158    ///
159    /// This can be useful, for example, when binding to port 0 to figure out
160    /// which port was actually bound.
161    pub fn local_addr(&self) -> io::Result<SocketAddr> {
162        self.io.get_ref().local_addr()
163    }
164
165    /// Consumes this listener, returning a stream of the sockets this listener
166    /// accepts.
167    ///
168    /// This method returns an implementation of the `Stream` trait which
169    /// resolves to the sockets the are accepted on this listener.
170    pub fn incoming(self) -> Incoming {
171        Incoming { inner: self }
172    }
173
174    /// Sets the value for the `IP_TTL` option on this socket.
175    ///
176    /// This value sets the time-to-live field that is used in every packet sent
177    /// from this socket.
178    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
179        self.io.get_ref().set_ttl(ttl)
180    }
181
182    /// Gets the value of the `IP_TTL` option for this socket.
183    ///
184    /// For more information about this option, see [`set_ttl`][link].
185    ///
186    /// [link]: #method.set_ttl
187    pub fn ttl(&self) -> io::Result<u32> {
188        self.io.get_ref().ttl()
189    }
190
191    /// Sets the value for the `IPV6_V6ONLY` option on this socket.
192    ///
193    /// If this is set to `true` then the socket is restricted to sending and
194    /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
195    /// can bind the same port at the same time.
196    ///
197    /// If this is set to `false` then the socket can be used to send and
198    /// receive packets from an IPv4-mapped IPv6 address.
199    pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
200        self.io.get_ref().set_only_v6(only_v6)
201    }
202
203    /// Gets the value of the `IPV6_V6ONLY` option for this socket.
204    ///
205    /// For more information about this option, see [`set_only_v6`][link].
206    ///
207    /// [link]: #method.set_only_v6
208    pub fn only_v6(&self) -> io::Result<bool> {
209        self.io.get_ref().only_v6()
210    }
211}
212
213impl fmt::Debug for TcpListener {
214    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
215        self.io.get_ref().fmt(f)
216    }
217}
218
219impl Stream for Incoming {
220    type Item = (TcpStream, SocketAddr);
221    type Error = io::Error;
222
223    fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
224        Ok(Async::Ready(Some(try_nb!(self.inner.accept()))))
225    }
226}
227
228/// An I/O object representing a TCP stream connected to a remote endpoint.
229///
230/// A TCP stream can either be created by connecting to an endpoint or by
231/// accepting a connection from a listener. Inside the stream is access to the
232/// raw underlying I/O object as well as streams for the read/write
233/// notifications on the stream itself.
234pub struct TcpStream {
235    io: PollEvented2<mio::net::TcpStream>,
236}
237
238/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
239/// when the stream is connected.
240#[must_use = "futures do nothing unless polled"]
241pub struct TcpStreamNew {
242    inner: TcpStreamNewState,
243}
244
245#[must_use = "futures do nothing unless polled"]
246enum TcpStreamNewState {
247    Waiting(TcpStream),
248    Error(io::Error),
249    Empty,
250}
251
252impl TcpStream {
253    /// Create a new TCP stream connected to the specified address.
254    ///
255    /// This function will create a new TCP socket and attempt to connect it to
256    /// the `addr` provided. The returned future will be resolved once the
257    /// stream has successfully connected. If an error happens during the
258    /// connection or during the socket creation, that error will be returned to
259    /// the future instead.
260    pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew {
261        let inner = match mio::net::TcpStream::connect(addr) {
262            Ok(tcp) => TcpStream::new(tcp, handle),
263            Err(e) => TcpStreamNewState::Error(e),
264        };
265        TcpStreamNew { inner: inner }
266    }
267
268    /// Create a new TCP stream connected to the specified address.
269    ///
270    /// This is the same as `connect`, but uses the default reactor instead of
271    /// taking an explicit `&Handle`.
272    pub fn connect2(addr: &SocketAddr) -> TcpStreamNew {
273        let inner = match mio::net::TcpStream::connect(addr) {
274            Ok(tcp) => TcpStream::new2(tcp),
275            Err(e) => TcpStreamNewState::Error(e),
276        };
277        TcpStreamNew { inner: inner }
278    }
279
280    fn new(connected_stream: mio::net::TcpStream, handle: &Handle)
281           -> TcpStreamNewState {
282        match PollEvented2::new_with_handle(connected_stream, handle.new_tokio_handle()) {
283            Ok(io) => TcpStreamNewState::Waiting(TcpStream { io: io }),
284            Err(e) => TcpStreamNewState::Error(e),
285        }
286    }
287
288    fn new2(connected_stream: mio::net::TcpStream)
289           -> TcpStreamNewState {
290        let io = PollEvented2::new(connected_stream);
291        TcpStreamNewState::Waiting(TcpStream { io: io })
292    }
293
294    /// Create a new `TcpStream` from a `net::TcpStream`.
295    ///
296    /// This function will convert a TCP stream in the standard library to a TCP
297    /// stream ready to be used with the provided event loop handle. The object
298    /// returned is associated with the event loop and ready to perform I/O.
299    pub fn from_stream(stream: net::TcpStream, handle: &Handle)
300                       -> io::Result<TcpStream> {
301        let inner = try!(mio::net::TcpStream::from_stream(stream));
302        Ok(TcpStream {
303            io: try!(PollEvented2::new_with_handle(inner, handle.new_tokio_handle())),
304        })
305    }
306
307    /// Creates a new `TcpStream` from the pending socket inside the given
308    /// `std::net::TcpStream`, connecting it to the address specified.
309    ///
310    /// This constructor allows configuring the socket before it's actually
311    /// connected, and this function will transfer ownership to the returned
312    /// `TcpStream` if successful. An unconnected `TcpStream` can be created
313    /// with the `net2::TcpBuilder` type (and also configured via that route).
314    ///
315    /// The platform specific behavior of this function looks like:
316    ///
317    /// * On Unix, the socket is placed into nonblocking mode and then a
318    ///   `connect` call is issued.
319    ///
320    /// * On Windows, the address is stored internally and the connect operation
321    ///   is issued when the returned `TcpStream` is registered with an event
322    ///   loop. Note that on Windows you must `bind` a socket before it can be
323    ///   connected, so if a custom `TcpBuilder` is used it should be bound
324    ///   (perhaps to `INADDR_ANY`) before this method is called.
325    pub fn connect_stream(stream: net::TcpStream,
326                          addr: &SocketAddr,
327                          handle: &Handle)
328                          -> Box<Future<Item=TcpStream, Error=io::Error> + Send> {
329        let state = match mio::net::TcpStream::connect_stream(stream, addr) {
330            Ok(tcp) => TcpStream::new(tcp, handle),
331            Err(e) => TcpStreamNewState::Error(e),
332        };
333        Box::new(state)
334    }
335
336    /// Test whether this socket is ready to be read or not.
337    ///
338    /// If the socket is *not* readable then the current task is scheduled to
339    /// get a notification when the socket does become readable. That is, this
340    /// is only suitable for calling in a `Future::poll` method and will
341    /// automatically handle ensuring a retry once the socket is readable again.
342    pub fn poll_read(&self) -> Async<()> {
343        self.io.poll_read_ready(mio::Ready::readable())
344            .map(|r| {
345                if r.is_ready() {
346                    Async::Ready(())
347                } else {
348                    Async::NotReady
349                }
350            })
351            .unwrap_or(().into())
352    }
353
354    /// Test whether this socket is ready to be written to or not.
355    ///
356    /// If the socket is *not* writable then the current task is scheduled to
357    /// get a notification when the socket does become writable. That is, this
358    /// is only suitable for calling in a `Future::poll` method and will
359    /// automatically handle ensuring a retry once the socket is writable again.
360    pub fn poll_write(&self) -> Async<()> {
361        self.io.poll_write_ready()
362            .map(|r| {
363                if r.is_ready() {
364                    Async::Ready(())
365                } else {
366                    Async::NotReady
367                }
368            })
369            .unwrap_or(().into())
370    }
371
372    /// Returns the local address that this stream is bound to.
373    pub fn local_addr(&self) -> io::Result<SocketAddr> {
374        self.io.get_ref().local_addr()
375    }
376
377    /// Returns the remote address that this stream is connected to.
378    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
379        self.io.get_ref().peer_addr()
380    }
381
382    /// Receives data on the socket from the remote address to which it is
383    /// connected, without removing that data from the queue. On success,
384    /// returns the number of bytes peeked.
385    ///
386    /// Successive calls return the same data. This is accomplished by passing
387    /// `MSG_PEEK` as a flag to the underlying recv system call.
388    pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
389        if let Async::NotReady = self.poll_read() {
390            return Err(io::ErrorKind::WouldBlock.into())
391        }
392        let r = self.io.get_ref().peek(buf);
393        if is_wouldblock(&r) {
394            self.io.clear_read_ready(mio::Ready::readable())?;
395        }
396        return r
397
398    }
399
400    /// Shuts down the read, write, or both halves of this connection.
401    ///
402    /// This function will cause all pending and future I/O on the specified
403    /// portions to return immediately with an appropriate value (see the
404    /// documentation of `Shutdown`).
405    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
406        self.io.get_ref().shutdown(how)
407    }
408
409    /// Sets the value of the `TCP_NODELAY` option on this socket.
410    ///
411    /// If set, this option disables the Nagle algorithm. This means that
412    /// segments are always sent as soon as possible, even if there is only a
413    /// small amount of data. When not set, data is buffered until there is a
414    /// sufficient amount to send out, thereby avoiding the frequent sending of
415    /// small packets.
416    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
417        self.io.get_ref().set_nodelay(nodelay)
418    }
419
420    /// Gets the value of the `TCP_NODELAY` option on this socket.
421    ///
422    /// For more information about this option, see [`set_nodelay`][link].
423    ///
424    /// [link]: #method.set_nodelay
425    pub fn nodelay(&self) -> io::Result<bool> {
426        self.io.get_ref().nodelay()
427    }
428
429    /// Sets the value of the `SO_RCVBUF` option on this socket.
430    ///
431    /// Changes the size of the operating system's receive buffer associated
432    /// with the socket.
433    pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
434        self.io.get_ref().set_recv_buffer_size(size)
435    }
436
437    /// Gets the value of the `SO_RCVBUF` option on this socket.
438    ///
439    /// For more information about this option, see
440    /// [`set_recv_buffer_size`][link].
441    ///
442    /// [link]: #tymethod.set_recv_buffer_size
443    pub fn recv_buffer_size(&self) -> io::Result<usize> {
444        self.io.get_ref().recv_buffer_size()
445    }
446
447    /// Sets the value of the `SO_SNDBUF` option on this socket.
448    ///
449    /// Changes the size of the operating system's send buffer associated with
450    /// the socket.
451    pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
452        self.io.get_ref().set_send_buffer_size(size)
453    }
454
455    /// Gets the value of the `SO_SNDBUF` option on this socket.
456    ///
457    /// For more information about this option, see [`set_send_buffer`][link].
458    ///
459    /// [link]: #tymethod.set_send_buffer
460    pub fn send_buffer_size(&self) -> io::Result<usize> {
461        self.io.get_ref().send_buffer_size()
462    }
463
464    /// Sets whether keepalive messages are enabled to be sent on this socket.
465    ///
466    /// On Unix, this option will set the `SO_KEEPALIVE` as well as the
467    /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform).
468    /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option.
469    ///
470    /// If `None` is specified then keepalive messages are disabled, otherwise
471    /// the duration specified will be the time to remain idle before sending a
472    /// TCP keepalive probe.
473    ///
474    /// Some platforms specify this value in seconds, so sub-second
475    /// specifications may be omitted.
476    pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
477        self.io.get_ref().set_keepalive(keepalive)
478    }
479
480    /// Returns whether keepalive messages are enabled on this socket, and if so
481    /// the duration of time between them.
482    ///
483    /// For more information about this option, see [`set_keepalive`][link].
484    ///
485    /// [link]: #tymethod.set_keepalive
486    pub fn keepalive(&self) -> io::Result<Option<Duration>> {
487        self.io.get_ref().keepalive()
488    }
489
490    /// Sets the value for the `IP_TTL` option on this socket.
491    ///
492    /// This value sets the time-to-live field that is used in every packet sent
493    /// from this socket.
494    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
495        self.io.get_ref().set_ttl(ttl)
496    }
497
498    /// Gets the value of the `IP_TTL` option for this socket.
499    ///
500    /// For more information about this option, see [`set_ttl`][link].
501    ///
502    /// [link]: #tymethod.set_ttl
503    pub fn ttl(&self) -> io::Result<u32> {
504        self.io.get_ref().ttl()
505    }
506
507    /// Sets the value for the `IPV6_V6ONLY` option on this socket.
508    ///
509    /// If this is set to `true` then the socket is restricted to sending and
510    /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
511    /// can bind the same port at the same time.
512    ///
513    /// If this is set to `false` then the socket can be used to send and
514    /// receive packets from an IPv4-mapped IPv6 address.
515    pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
516        self.io.get_ref().set_only_v6(only_v6)
517    }
518
519    /// Gets the value of the `IPV6_V6ONLY` option for this socket.
520    ///
521    /// For more information about this option, see [`set_only_v6`][link].
522    ///
523    /// [link]: #tymethod.set_only_v6
524    pub fn only_v6(&self) -> io::Result<bool> {
525        self.io.get_ref().only_v6()
526    }
527
528    /// Sets the linger duration of this socket by setting the SO_LINGER option
529    pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
530        self.io.get_ref().set_linger(dur)
531    }
532
533    /// reads the linger duration for this socket by getting the SO_LINGER option
534    pub fn linger(&self) -> io::Result<Option<Duration>> {
535        self.io.get_ref().linger()
536    }
537
538    #[deprecated(since = "0.1.8", note = "use set_keepalive")]
539    #[doc(hidden)]
540    pub fn set_keepalive_ms(&self, keepalive: Option<u32>) -> io::Result<()> {
541        #[allow(deprecated)]
542        self.io.get_ref().set_keepalive_ms(keepalive)
543    }
544
545    #[deprecated(since = "0.1.8", note = "use keepalive")]
546    #[doc(hidden)]
547    pub fn keepalive_ms(&self) -> io::Result<Option<u32>> {
548        #[allow(deprecated)]
549        self.io.get_ref().keepalive_ms()
550    }
551}
552
553impl Read for TcpStream {
554    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
555        self.io.read(buf)
556    }
557}
558
559impl Write for TcpStream {
560    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
561        self.io.write(buf)
562    }
563    fn flush(&mut self) -> io::Result<()> {
564        Ok(())
565    }
566}
567
568impl AsyncRead for TcpStream {
569    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
570        false
571    }
572
573    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
574        <&TcpStream>::read_buf(&mut &*self, buf)
575    }
576}
577
578impl AsyncWrite for TcpStream {
579    fn shutdown(&mut self) -> Poll<(), io::Error> {
580        <&TcpStream>::shutdown(&mut &*self)
581    }
582
583    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
584        <&TcpStream>::write_buf(&mut &*self, buf)
585    }
586}
587
588#[allow(deprecated)]
589impl ::io::Io for TcpStream {
590    fn poll_read(&mut self) -> Async<()> {
591        <TcpStream>::poll_read(self)
592    }
593
594    fn poll_write(&mut self) -> Async<()> {
595        <TcpStream>::poll_write(self)
596    }
597
598    fn read_vec(&mut self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
599        if let Async::NotReady = <TcpStream>::poll_read(self) {
600            return Err(io::ErrorKind::WouldBlock.into())
601        }
602        let r = self.io.get_ref().read_bufs(bufs);
603        if is_wouldblock(&r) {
604            self.io.clear_read_ready(mio::Ready::readable())?;
605        }
606        return r
607    }
608
609    fn write_vec(&mut self, bufs: &[&IoVec]) -> io::Result<usize> {
610        if let Async::NotReady = <TcpStream>::poll_write(self) {
611            return Err(io::ErrorKind::WouldBlock.into())
612        }
613        let r = self.io.get_ref().write_bufs(bufs);
614        if is_wouldblock(&r) {
615            self.io.clear_write_ready()?;
616        }
617        return r
618    }
619}
620
621fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
622    match *r {
623        Ok(_) => false,
624        Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
625    }
626}
627
628impl<'a> Read for &'a TcpStream {
629    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
630        (&self.io).read(buf)
631    }
632}
633
634impl<'a> Write for &'a TcpStream {
635    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
636        (&self.io).write(buf)
637    }
638
639    fn flush(&mut self) -> io::Result<()> {
640        (&self.io).flush()
641    }
642}
643
644impl<'a> AsyncRead for &'a TcpStream {
645    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
646        false
647    }
648
649    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
650        if let Async::NotReady = <TcpStream>::poll_read(self) {
651            return Ok(Async::NotReady)
652        }
653        let r = unsafe {
654            // The `IoVec` type can't have a 0-length size, so we create a bunch
655            // of dummy versions on the stack with 1 length which we'll quickly
656            // overwrite.
657            let b1: &mut [u8] = &mut [0];
658            let b2: &mut [u8] = &mut [0];
659            let b3: &mut [u8] = &mut [0];
660            let b4: &mut [u8] = &mut [0];
661            let b5: &mut [u8] = &mut [0];
662            let b6: &mut [u8] = &mut [0];
663            let b7: &mut [u8] = &mut [0];
664            let b8: &mut [u8] = &mut [0];
665            let b9: &mut [u8] = &mut [0];
666            let b10: &mut [u8] = &mut [0];
667            let b11: &mut [u8] = &mut [0];
668            let b12: &mut [u8] = &mut [0];
669            let b13: &mut [u8] = &mut [0];
670            let b14: &mut [u8] = &mut [0];
671            let b15: &mut [u8] = &mut [0];
672            let b16: &mut [u8] = &mut [0];
673            let mut bufs: [&mut IoVec; 16] = [
674                b1.into(), b2.into(), b3.into(), b4.into(),
675                b5.into(), b6.into(), b7.into(), b8.into(),
676                b9.into(), b10.into(), b11.into(), b12.into(),
677                b13.into(), b14.into(), b15.into(), b16.into(),
678            ];
679            let n = buf.bytes_vec_mut(&mut bufs);
680            self.io.get_ref().read_bufs(&mut bufs[..n])
681        };
682
683        match r {
684            Ok(n) => {
685                unsafe { buf.advance_mut(n); }
686                Ok(Async::Ready(n))
687            }
688            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
689                self.io.clear_read_ready(mio::Ready::readable())?;
690                Ok(Async::NotReady)
691            }
692            Err(e) => Err(e),
693        }
694    }
695}
696
697impl<'a> AsyncWrite for &'a TcpStream {
698    fn shutdown(&mut self) -> Poll<(), io::Error> {
699        Ok(().into())
700    }
701
702    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
703        if let Async::NotReady = <TcpStream>::poll_write(self) {
704            return Ok(Async::NotReady)
705        }
706        let r = {
707            // The `IoVec` type can't have a zero-length size, so create a dummy
708            // version from a 1-length slice which we'll overwrite with the
709            // `bytes_vec` method.
710            static DUMMY: &[u8] = &[0];
711            let iovec = <&IoVec>::from(DUMMY);
712            let mut bufs = [iovec; 64];
713            let n = buf.bytes_vec(&mut bufs);
714            self.io.get_ref().write_bufs(&bufs[..n])
715        };
716        match r {
717            Ok(n) => {
718                buf.advance(n);
719                Ok(Async::Ready(n))
720            }
721            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
722                self.io.clear_write_ready()?;
723                Ok(Async::NotReady)
724            }
725            Err(e) => Err(e),
726        }
727    }
728}
729
730#[allow(deprecated)]
731impl<'a> ::io::Io for &'a TcpStream {
732    fn poll_read(&mut self) -> Async<()> {
733        <TcpStream>::poll_read(self)
734    }
735
736    fn poll_write(&mut self) -> Async<()> {
737        <TcpStream>::poll_write(self)
738    }
739}
740
741impl fmt::Debug for TcpStream {
742    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
743        self.io.get_ref().fmt(f)
744    }
745}
746
747impl Future for TcpStreamNew {
748    type Item = TcpStream;
749    type Error = io::Error;
750
751    fn poll(&mut self) -> Poll<TcpStream, io::Error> {
752        self.inner.poll()
753    }
754}
755
756impl Future for TcpStreamNewState {
757    type Item = TcpStream;
758    type Error = io::Error;
759
760    fn poll(&mut self) -> Poll<TcpStream, io::Error> {
761        {
762            let stream = match *self {
763                TcpStreamNewState::Waiting(ref s) => s,
764                TcpStreamNewState::Error(_) => {
765                    let e = match mem::replace(self, TcpStreamNewState::Empty) {
766                        TcpStreamNewState::Error(e) => e,
767                        _ => panic!(),
768                    };
769                    return Err(e)
770                }
771                TcpStreamNewState::Empty => panic!("can't poll TCP stream twice"),
772            };
773
774            // Once we've connected, wait for the stream to be writable as
775            // that's when the actual connection has been initiated. Once we're
776            // writable we check for `take_socket_error` to see if the connect
777            // actually hit an error or not.
778            //
779            // If all that succeeded then we ship everything on up.
780            if let Async::NotReady = stream.io.poll_write_ready()? {
781                return Ok(Async::NotReady)
782            }
783            if let Some(e) = try!(stream.io.get_ref().take_error()) {
784                return Err(e)
785            }
786        }
787        match mem::replace(self, TcpStreamNewState::Empty) {
788            TcpStreamNewState::Waiting(stream) => Ok(Async::Ready(stream)),
789            _ => panic!(),
790        }
791    }
792}
793
794#[cfg(all(unix, not(target_os = "fuchsia")))]
795mod sys {
796    use std::os::unix::prelude::*;
797    use super::{TcpStream, TcpListener};
798
799    impl AsRawFd for TcpStream {
800        fn as_raw_fd(&self) -> RawFd {
801            self.io.get_ref().as_raw_fd()
802        }
803    }
804
805    impl AsRawFd for TcpListener {
806        fn as_raw_fd(&self) -> RawFd {
807            self.io.get_ref().as_raw_fd()
808        }
809    }
810}
811
812#[cfg(windows)]
813mod sys {
814    // TODO: let's land these upstream with mio and then we can add them here.
815    //
816    // use std::os::windows::prelude::*;
817    // use super::{TcpStream, TcpListener};
818    //
819    // impl AsRawHandle for TcpStream {
820    //     fn as_raw_handle(&self) -> RawHandle {
821    //         self.io.get_ref().as_raw_handle()
822    //     }
823    // }
824    //
825    // impl AsRawHandle for TcpListener {
826    //     fn as_raw_handle(&self) -> RawHandle {
827    //         self.listener.io().as_raw_handle()
828    //     }
829    // }
830}