tokio_core/net/tcp.rs
1use std::fmt;
2use std::io::{self, Read, Write};
3use std::mem;
4use std::net::{self, SocketAddr, Shutdown};
5use std::time::Duration;
6
7use bytes::{Buf, BufMut};
8use futures::stream::Stream;
9use futures::{Future, Poll, Async};
10use iovec::IoVec;
11use mio;
12use tokio_io::{AsyncRead, AsyncWrite};
13
14use reactor::{Handle, PollEvented2};
15
16/// An I/O object representing a TCP socket listening for incoming connections.
17///
18/// This object can be converted into a stream of incoming connections for
19/// various forms of processing.
20pub struct TcpListener {
21 io: PollEvented2<mio::net::TcpListener>,
22}
23
24/// Stream returned by the `TcpListener::incoming` function representing the
25/// stream of sockets received from a listener.
26#[must_use = "streams do nothing unless polled"]
27pub struct Incoming {
28 inner: TcpListener,
29}
30
31impl TcpListener {
32 /// Create a new TCP listener associated with this event loop.
33 ///
34 /// The TCP listener will bind to the provided `addr` address, if available.
35 /// If the result is `Ok`, the socket has successfully bound.
36 pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
37 let l = try!(mio::net::TcpListener::bind(addr));
38 TcpListener::new(l, handle)
39 }
40
41 /// Create a new TCP listener associated with this event loop.
42 ///
43 /// This is the same as `bind` but uses the default reactor instead of an
44 /// explicit `&Handle`.
45 pub fn bind2(addr: &SocketAddr) -> io::Result<TcpListener> {
46 let l = try!(mio::net::TcpListener::bind(addr));
47 TcpListener::new2(l)
48 }
49
50 /// Attempt to accept a connection and create a new connected `TcpStream` if
51 /// successful.
52 ///
53 /// This function will attempt an accept operation, but will not block
54 /// waiting for it to complete. If the operation would block then a "would
55 /// block" error is returned. Additionally, if this method would block, it
56 /// registers the current task to receive a notification when it would
57 /// otherwise not block.
58 ///
59 /// Note that typically for simple usage it's easier to treat incoming
60 /// connections as a `Stream` of `TcpStream`s with the `incoming` method
61 /// below.
62 ///
63 /// # Panics
64 ///
65 /// This function will panic if it is called outside the context of a
66 /// future's task. It's recommended to only call this from the
67 /// implementation of a `Future::poll`, if necessary.
68 pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
69 let (io, addr) = self.accept_std()?;
70
71 let io = mio::net::TcpStream::from_stream(io)?;
72 let io = PollEvented2::new(io);
73 let io = TcpStream { io };
74
75 Ok((io, addr))
76 }
77
78 /// Like `accept`, except that it returns a raw `std::net::TcpStream`.
79 ///
80 /// The stream is *in blocking mode*, and is not associated with the Tokio
81 /// event loop.
82 pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> {
83 if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? {
84 return Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready"))
85 }
86
87 match self.io.get_ref().accept_std() {
88 Err(e) => {
89 if e.kind() == io::ErrorKind::WouldBlock {
90 self.io.clear_read_ready(mio::Ready::readable())?;
91 }
92 Err(e)
93 },
94 Ok((sock, addr)) => Ok((sock, addr)),
95 }
96 }
97
98 /// Create a new TCP listener from the standard library's TCP listener.
99 ///
100 /// This method can be used when the `Handle::tcp_listen` method isn't
101 /// sufficient because perhaps some more configuration is needed in terms of
102 /// before the calls to `bind` and `listen`.
103 ///
104 /// This API is typically paired with the `net2` crate and the `TcpBuilder`
105 /// type to build up and customize a listener before it's shipped off to the
106 /// backing event loop. This allows configuration of options like
107 /// `SO_REUSEPORT`, binding to multiple addresses, etc.
108 ///
109 /// The `addr` argument here is one of the addresses that `listener` is
110 /// bound to and the listener will only be guaranteed to accept connections
111 /// of the same address type currently.
112 ///
113 /// Finally, the `handle` argument is the event loop that this listener will
114 /// be bound to.
115 ///
116 /// The platform specific behavior of this function looks like:
117 ///
118 /// * On Unix, the socket is placed into nonblocking mode and connections
119 /// can be accepted as normal
120 ///
121 /// * On Windows, the address is stored internally and all future accepts
122 /// will only be for the same IP version as `addr` specified. That is, if
123 /// `addr` is an IPv4 address then all sockets accepted will be IPv4 as
124 /// well (same for IPv6).
125 pub fn from_listener(listener: net::TcpListener,
126 _addr: &SocketAddr,
127 handle: &Handle) -> io::Result<TcpListener> {
128 let l = try!(mio::net::TcpListener::from_std(listener));
129 TcpListener::new(l, handle)
130 }
131
132 fn new(listener: mio::net::TcpListener, handle: &Handle)
133 -> io::Result<TcpListener> {
134 let io = try!(PollEvented2::new_with_handle(listener, handle.new_tokio_handle()));
135 Ok(TcpListener { io: io })
136 }
137
138 fn new2(listener: mio::net::TcpListener)
139 -> io::Result<TcpListener> {
140 let io = PollEvented2::new(listener);
141 Ok(TcpListener { io: io })
142 }
143
144 /// Test whether this socket is ready to be read or not.
145 pub fn poll_read(&self) -> Async<()> {
146 self.io.poll_read_ready(mio::Ready::readable())
147 .map(|r| {
148 if r.is_ready() {
149 Async::Ready(())
150 } else {
151 Async::NotReady
152 }
153 })
154 .unwrap_or(().into())
155 }
156
157 /// Returns the local address that this listener is bound to.
158 ///
159 /// This can be useful, for example, when binding to port 0 to figure out
160 /// which port was actually bound.
161 pub fn local_addr(&self) -> io::Result<SocketAddr> {
162 self.io.get_ref().local_addr()
163 }
164
165 /// Consumes this listener, returning a stream of the sockets this listener
166 /// accepts.
167 ///
168 /// This method returns an implementation of the `Stream` trait which
169 /// resolves to the sockets the are accepted on this listener.
170 pub fn incoming(self) -> Incoming {
171 Incoming { inner: self }
172 }
173
174 /// Sets the value for the `IP_TTL` option on this socket.
175 ///
176 /// This value sets the time-to-live field that is used in every packet sent
177 /// from this socket.
178 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
179 self.io.get_ref().set_ttl(ttl)
180 }
181
182 /// Gets the value of the `IP_TTL` option for this socket.
183 ///
184 /// For more information about this option, see [`set_ttl`][link].
185 ///
186 /// [link]: #method.set_ttl
187 pub fn ttl(&self) -> io::Result<u32> {
188 self.io.get_ref().ttl()
189 }
190
191 /// Sets the value for the `IPV6_V6ONLY` option on this socket.
192 ///
193 /// If this is set to `true` then the socket is restricted to sending and
194 /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
195 /// can bind the same port at the same time.
196 ///
197 /// If this is set to `false` then the socket can be used to send and
198 /// receive packets from an IPv4-mapped IPv6 address.
199 pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
200 self.io.get_ref().set_only_v6(only_v6)
201 }
202
203 /// Gets the value of the `IPV6_V6ONLY` option for this socket.
204 ///
205 /// For more information about this option, see [`set_only_v6`][link].
206 ///
207 /// [link]: #method.set_only_v6
208 pub fn only_v6(&self) -> io::Result<bool> {
209 self.io.get_ref().only_v6()
210 }
211}
212
213impl fmt::Debug for TcpListener {
214 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
215 self.io.get_ref().fmt(f)
216 }
217}
218
219impl Stream for Incoming {
220 type Item = (TcpStream, SocketAddr);
221 type Error = io::Error;
222
223 fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
224 Ok(Async::Ready(Some(try_nb!(self.inner.accept()))))
225 }
226}
227
228/// An I/O object representing a TCP stream connected to a remote endpoint.
229///
230/// A TCP stream can either be created by connecting to an endpoint or by
231/// accepting a connection from a listener. Inside the stream is access to the
232/// raw underlying I/O object as well as streams for the read/write
233/// notifications on the stream itself.
234pub struct TcpStream {
235 io: PollEvented2<mio::net::TcpStream>,
236}
237
238/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
239/// when the stream is connected.
240#[must_use = "futures do nothing unless polled"]
241pub struct TcpStreamNew {
242 inner: TcpStreamNewState,
243}
244
245#[must_use = "futures do nothing unless polled"]
246enum TcpStreamNewState {
247 Waiting(TcpStream),
248 Error(io::Error),
249 Empty,
250}
251
252impl TcpStream {
253 /// Create a new TCP stream connected to the specified address.
254 ///
255 /// This function will create a new TCP socket and attempt to connect it to
256 /// the `addr` provided. The returned future will be resolved once the
257 /// stream has successfully connected. If an error happens during the
258 /// connection or during the socket creation, that error will be returned to
259 /// the future instead.
260 pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew {
261 let inner = match mio::net::TcpStream::connect(addr) {
262 Ok(tcp) => TcpStream::new(tcp, handle),
263 Err(e) => TcpStreamNewState::Error(e),
264 };
265 TcpStreamNew { inner: inner }
266 }
267
268 /// Create a new TCP stream connected to the specified address.
269 ///
270 /// This is the same as `connect`, but uses the default reactor instead of
271 /// taking an explicit `&Handle`.
272 pub fn connect2(addr: &SocketAddr) -> TcpStreamNew {
273 let inner = match mio::net::TcpStream::connect(addr) {
274 Ok(tcp) => TcpStream::new2(tcp),
275 Err(e) => TcpStreamNewState::Error(e),
276 };
277 TcpStreamNew { inner: inner }
278 }
279
280 fn new(connected_stream: mio::net::TcpStream, handle: &Handle)
281 -> TcpStreamNewState {
282 match PollEvented2::new_with_handle(connected_stream, handle.new_tokio_handle()) {
283 Ok(io) => TcpStreamNewState::Waiting(TcpStream { io: io }),
284 Err(e) => TcpStreamNewState::Error(e),
285 }
286 }
287
288 fn new2(connected_stream: mio::net::TcpStream)
289 -> TcpStreamNewState {
290 let io = PollEvented2::new(connected_stream);
291 TcpStreamNewState::Waiting(TcpStream { io: io })
292 }
293
294 /// Create a new `TcpStream` from a `net::TcpStream`.
295 ///
296 /// This function will convert a TCP stream in the standard library to a TCP
297 /// stream ready to be used with the provided event loop handle. The object
298 /// returned is associated with the event loop and ready to perform I/O.
299 pub fn from_stream(stream: net::TcpStream, handle: &Handle)
300 -> io::Result<TcpStream> {
301 let inner = try!(mio::net::TcpStream::from_stream(stream));
302 Ok(TcpStream {
303 io: try!(PollEvented2::new_with_handle(inner, handle.new_tokio_handle())),
304 })
305 }
306
307 /// Creates a new `TcpStream` from the pending socket inside the given
308 /// `std::net::TcpStream`, connecting it to the address specified.
309 ///
310 /// This constructor allows configuring the socket before it's actually
311 /// connected, and this function will transfer ownership to the returned
312 /// `TcpStream` if successful. An unconnected `TcpStream` can be created
313 /// with the `net2::TcpBuilder` type (and also configured via that route).
314 ///
315 /// The platform specific behavior of this function looks like:
316 ///
317 /// * On Unix, the socket is placed into nonblocking mode and then a
318 /// `connect` call is issued.
319 ///
320 /// * On Windows, the address is stored internally and the connect operation
321 /// is issued when the returned `TcpStream` is registered with an event
322 /// loop. Note that on Windows you must `bind` a socket before it can be
323 /// connected, so if a custom `TcpBuilder` is used it should be bound
324 /// (perhaps to `INADDR_ANY`) before this method is called.
325 pub fn connect_stream(stream: net::TcpStream,
326 addr: &SocketAddr,
327 handle: &Handle)
328 -> Box<Future<Item=TcpStream, Error=io::Error> + Send> {
329 let state = match mio::net::TcpStream::connect_stream(stream, addr) {
330 Ok(tcp) => TcpStream::new(tcp, handle),
331 Err(e) => TcpStreamNewState::Error(e),
332 };
333 Box::new(state)
334 }
335
336 /// Test whether this socket is ready to be read or not.
337 ///
338 /// If the socket is *not* readable then the current task is scheduled to
339 /// get a notification when the socket does become readable. That is, this
340 /// is only suitable for calling in a `Future::poll` method and will
341 /// automatically handle ensuring a retry once the socket is readable again.
342 pub fn poll_read(&self) -> Async<()> {
343 self.io.poll_read_ready(mio::Ready::readable())
344 .map(|r| {
345 if r.is_ready() {
346 Async::Ready(())
347 } else {
348 Async::NotReady
349 }
350 })
351 .unwrap_or(().into())
352 }
353
354 /// Test whether this socket is ready to be written to or not.
355 ///
356 /// If the socket is *not* writable then the current task is scheduled to
357 /// get a notification when the socket does become writable. That is, this
358 /// is only suitable for calling in a `Future::poll` method and will
359 /// automatically handle ensuring a retry once the socket is writable again.
360 pub fn poll_write(&self) -> Async<()> {
361 self.io.poll_write_ready()
362 .map(|r| {
363 if r.is_ready() {
364 Async::Ready(())
365 } else {
366 Async::NotReady
367 }
368 })
369 .unwrap_or(().into())
370 }
371
372 /// Returns the local address that this stream is bound to.
373 pub fn local_addr(&self) -> io::Result<SocketAddr> {
374 self.io.get_ref().local_addr()
375 }
376
377 /// Returns the remote address that this stream is connected to.
378 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
379 self.io.get_ref().peer_addr()
380 }
381
382 /// Receives data on the socket from the remote address to which it is
383 /// connected, without removing that data from the queue. On success,
384 /// returns the number of bytes peeked.
385 ///
386 /// Successive calls return the same data. This is accomplished by passing
387 /// `MSG_PEEK` as a flag to the underlying recv system call.
388 pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
389 if let Async::NotReady = self.poll_read() {
390 return Err(io::ErrorKind::WouldBlock.into())
391 }
392 let r = self.io.get_ref().peek(buf);
393 if is_wouldblock(&r) {
394 self.io.clear_read_ready(mio::Ready::readable())?;
395 }
396 return r
397
398 }
399
400 /// Shuts down the read, write, or both halves of this connection.
401 ///
402 /// This function will cause all pending and future I/O on the specified
403 /// portions to return immediately with an appropriate value (see the
404 /// documentation of `Shutdown`).
405 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
406 self.io.get_ref().shutdown(how)
407 }
408
409 /// Sets the value of the `TCP_NODELAY` option on this socket.
410 ///
411 /// If set, this option disables the Nagle algorithm. This means that
412 /// segments are always sent as soon as possible, even if there is only a
413 /// small amount of data. When not set, data is buffered until there is a
414 /// sufficient amount to send out, thereby avoiding the frequent sending of
415 /// small packets.
416 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
417 self.io.get_ref().set_nodelay(nodelay)
418 }
419
420 /// Gets the value of the `TCP_NODELAY` option on this socket.
421 ///
422 /// For more information about this option, see [`set_nodelay`][link].
423 ///
424 /// [link]: #method.set_nodelay
425 pub fn nodelay(&self) -> io::Result<bool> {
426 self.io.get_ref().nodelay()
427 }
428
429 /// Sets the value of the `SO_RCVBUF` option on this socket.
430 ///
431 /// Changes the size of the operating system's receive buffer associated
432 /// with the socket.
433 pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
434 self.io.get_ref().set_recv_buffer_size(size)
435 }
436
437 /// Gets the value of the `SO_RCVBUF` option on this socket.
438 ///
439 /// For more information about this option, see
440 /// [`set_recv_buffer_size`][link].
441 ///
442 /// [link]: #tymethod.set_recv_buffer_size
443 pub fn recv_buffer_size(&self) -> io::Result<usize> {
444 self.io.get_ref().recv_buffer_size()
445 }
446
447 /// Sets the value of the `SO_SNDBUF` option on this socket.
448 ///
449 /// Changes the size of the operating system's send buffer associated with
450 /// the socket.
451 pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
452 self.io.get_ref().set_send_buffer_size(size)
453 }
454
455 /// Gets the value of the `SO_SNDBUF` option on this socket.
456 ///
457 /// For more information about this option, see [`set_send_buffer`][link].
458 ///
459 /// [link]: #tymethod.set_send_buffer
460 pub fn send_buffer_size(&self) -> io::Result<usize> {
461 self.io.get_ref().send_buffer_size()
462 }
463
464 /// Sets whether keepalive messages are enabled to be sent on this socket.
465 ///
466 /// On Unix, this option will set the `SO_KEEPALIVE` as well as the
467 /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform).
468 /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option.
469 ///
470 /// If `None` is specified then keepalive messages are disabled, otherwise
471 /// the duration specified will be the time to remain idle before sending a
472 /// TCP keepalive probe.
473 ///
474 /// Some platforms specify this value in seconds, so sub-second
475 /// specifications may be omitted.
476 pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
477 self.io.get_ref().set_keepalive(keepalive)
478 }
479
480 /// Returns whether keepalive messages are enabled on this socket, and if so
481 /// the duration of time between them.
482 ///
483 /// For more information about this option, see [`set_keepalive`][link].
484 ///
485 /// [link]: #tymethod.set_keepalive
486 pub fn keepalive(&self) -> io::Result<Option<Duration>> {
487 self.io.get_ref().keepalive()
488 }
489
490 /// Sets the value for the `IP_TTL` option on this socket.
491 ///
492 /// This value sets the time-to-live field that is used in every packet sent
493 /// from this socket.
494 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
495 self.io.get_ref().set_ttl(ttl)
496 }
497
498 /// Gets the value of the `IP_TTL` option for this socket.
499 ///
500 /// For more information about this option, see [`set_ttl`][link].
501 ///
502 /// [link]: #tymethod.set_ttl
503 pub fn ttl(&self) -> io::Result<u32> {
504 self.io.get_ref().ttl()
505 }
506
507 /// Sets the value for the `IPV6_V6ONLY` option on this socket.
508 ///
509 /// If this is set to `true` then the socket is restricted to sending and
510 /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
511 /// can bind the same port at the same time.
512 ///
513 /// If this is set to `false` then the socket can be used to send and
514 /// receive packets from an IPv4-mapped IPv6 address.
515 pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
516 self.io.get_ref().set_only_v6(only_v6)
517 }
518
519 /// Gets the value of the `IPV6_V6ONLY` option for this socket.
520 ///
521 /// For more information about this option, see [`set_only_v6`][link].
522 ///
523 /// [link]: #tymethod.set_only_v6
524 pub fn only_v6(&self) -> io::Result<bool> {
525 self.io.get_ref().only_v6()
526 }
527
528 /// Sets the linger duration of this socket by setting the SO_LINGER option
529 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
530 self.io.get_ref().set_linger(dur)
531 }
532
533 /// reads the linger duration for this socket by getting the SO_LINGER option
534 pub fn linger(&self) -> io::Result<Option<Duration>> {
535 self.io.get_ref().linger()
536 }
537
538 #[deprecated(since = "0.1.8", note = "use set_keepalive")]
539 #[doc(hidden)]
540 pub fn set_keepalive_ms(&self, keepalive: Option<u32>) -> io::Result<()> {
541 #[allow(deprecated)]
542 self.io.get_ref().set_keepalive_ms(keepalive)
543 }
544
545 #[deprecated(since = "0.1.8", note = "use keepalive")]
546 #[doc(hidden)]
547 pub fn keepalive_ms(&self) -> io::Result<Option<u32>> {
548 #[allow(deprecated)]
549 self.io.get_ref().keepalive_ms()
550 }
551}
552
553impl Read for TcpStream {
554 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
555 self.io.read(buf)
556 }
557}
558
559impl Write for TcpStream {
560 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
561 self.io.write(buf)
562 }
563 fn flush(&mut self) -> io::Result<()> {
564 Ok(())
565 }
566}
567
568impl AsyncRead for TcpStream {
569 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
570 false
571 }
572
573 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
574 <&TcpStream>::read_buf(&mut &*self, buf)
575 }
576}
577
578impl AsyncWrite for TcpStream {
579 fn shutdown(&mut self) -> Poll<(), io::Error> {
580 <&TcpStream>::shutdown(&mut &*self)
581 }
582
583 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
584 <&TcpStream>::write_buf(&mut &*self, buf)
585 }
586}
587
588#[allow(deprecated)]
589impl ::io::Io for TcpStream {
590 fn poll_read(&mut self) -> Async<()> {
591 <TcpStream>::poll_read(self)
592 }
593
594 fn poll_write(&mut self) -> Async<()> {
595 <TcpStream>::poll_write(self)
596 }
597
598 fn read_vec(&mut self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
599 if let Async::NotReady = <TcpStream>::poll_read(self) {
600 return Err(io::ErrorKind::WouldBlock.into())
601 }
602 let r = self.io.get_ref().read_bufs(bufs);
603 if is_wouldblock(&r) {
604 self.io.clear_read_ready(mio::Ready::readable())?;
605 }
606 return r
607 }
608
609 fn write_vec(&mut self, bufs: &[&IoVec]) -> io::Result<usize> {
610 if let Async::NotReady = <TcpStream>::poll_write(self) {
611 return Err(io::ErrorKind::WouldBlock.into())
612 }
613 let r = self.io.get_ref().write_bufs(bufs);
614 if is_wouldblock(&r) {
615 self.io.clear_write_ready()?;
616 }
617 return r
618 }
619}
620
621fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
622 match *r {
623 Ok(_) => false,
624 Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
625 }
626}
627
628impl<'a> Read for &'a TcpStream {
629 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
630 (&self.io).read(buf)
631 }
632}
633
634impl<'a> Write for &'a TcpStream {
635 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
636 (&self.io).write(buf)
637 }
638
639 fn flush(&mut self) -> io::Result<()> {
640 (&self.io).flush()
641 }
642}
643
644impl<'a> AsyncRead for &'a TcpStream {
645 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
646 false
647 }
648
649 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
650 if let Async::NotReady = <TcpStream>::poll_read(self) {
651 return Ok(Async::NotReady)
652 }
653 let r = unsafe {
654 // The `IoVec` type can't have a 0-length size, so we create a bunch
655 // of dummy versions on the stack with 1 length which we'll quickly
656 // overwrite.
657 let b1: &mut [u8] = &mut [0];
658 let b2: &mut [u8] = &mut [0];
659 let b3: &mut [u8] = &mut [0];
660 let b4: &mut [u8] = &mut [0];
661 let b5: &mut [u8] = &mut [0];
662 let b6: &mut [u8] = &mut [0];
663 let b7: &mut [u8] = &mut [0];
664 let b8: &mut [u8] = &mut [0];
665 let b9: &mut [u8] = &mut [0];
666 let b10: &mut [u8] = &mut [0];
667 let b11: &mut [u8] = &mut [0];
668 let b12: &mut [u8] = &mut [0];
669 let b13: &mut [u8] = &mut [0];
670 let b14: &mut [u8] = &mut [0];
671 let b15: &mut [u8] = &mut [0];
672 let b16: &mut [u8] = &mut [0];
673 let mut bufs: [&mut IoVec; 16] = [
674 b1.into(), b2.into(), b3.into(), b4.into(),
675 b5.into(), b6.into(), b7.into(), b8.into(),
676 b9.into(), b10.into(), b11.into(), b12.into(),
677 b13.into(), b14.into(), b15.into(), b16.into(),
678 ];
679 let n = buf.bytes_vec_mut(&mut bufs);
680 self.io.get_ref().read_bufs(&mut bufs[..n])
681 };
682
683 match r {
684 Ok(n) => {
685 unsafe { buf.advance_mut(n); }
686 Ok(Async::Ready(n))
687 }
688 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
689 self.io.clear_read_ready(mio::Ready::readable())?;
690 Ok(Async::NotReady)
691 }
692 Err(e) => Err(e),
693 }
694 }
695}
696
697impl<'a> AsyncWrite for &'a TcpStream {
698 fn shutdown(&mut self) -> Poll<(), io::Error> {
699 Ok(().into())
700 }
701
702 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
703 if let Async::NotReady = <TcpStream>::poll_write(self) {
704 return Ok(Async::NotReady)
705 }
706 let r = {
707 // The `IoVec` type can't have a zero-length size, so create a dummy
708 // version from a 1-length slice which we'll overwrite with the
709 // `bytes_vec` method.
710 static DUMMY: &[u8] = &[0];
711 let iovec = <&IoVec>::from(DUMMY);
712 let mut bufs = [iovec; 64];
713 let n = buf.bytes_vec(&mut bufs);
714 self.io.get_ref().write_bufs(&bufs[..n])
715 };
716 match r {
717 Ok(n) => {
718 buf.advance(n);
719 Ok(Async::Ready(n))
720 }
721 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
722 self.io.clear_write_ready()?;
723 Ok(Async::NotReady)
724 }
725 Err(e) => Err(e),
726 }
727 }
728}
729
730#[allow(deprecated)]
731impl<'a> ::io::Io for &'a TcpStream {
732 fn poll_read(&mut self) -> Async<()> {
733 <TcpStream>::poll_read(self)
734 }
735
736 fn poll_write(&mut self) -> Async<()> {
737 <TcpStream>::poll_write(self)
738 }
739}
740
741impl fmt::Debug for TcpStream {
742 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
743 self.io.get_ref().fmt(f)
744 }
745}
746
747impl Future for TcpStreamNew {
748 type Item = TcpStream;
749 type Error = io::Error;
750
751 fn poll(&mut self) -> Poll<TcpStream, io::Error> {
752 self.inner.poll()
753 }
754}
755
756impl Future for TcpStreamNewState {
757 type Item = TcpStream;
758 type Error = io::Error;
759
760 fn poll(&mut self) -> Poll<TcpStream, io::Error> {
761 {
762 let stream = match *self {
763 TcpStreamNewState::Waiting(ref s) => s,
764 TcpStreamNewState::Error(_) => {
765 let e = match mem::replace(self, TcpStreamNewState::Empty) {
766 TcpStreamNewState::Error(e) => e,
767 _ => panic!(),
768 };
769 return Err(e)
770 }
771 TcpStreamNewState::Empty => panic!("can't poll TCP stream twice"),
772 };
773
774 // Once we've connected, wait for the stream to be writable as
775 // that's when the actual connection has been initiated. Once we're
776 // writable we check for `take_socket_error` to see if the connect
777 // actually hit an error or not.
778 //
779 // If all that succeeded then we ship everything on up.
780 if let Async::NotReady = stream.io.poll_write_ready()? {
781 return Ok(Async::NotReady)
782 }
783 if let Some(e) = try!(stream.io.get_ref().take_error()) {
784 return Err(e)
785 }
786 }
787 match mem::replace(self, TcpStreamNewState::Empty) {
788 TcpStreamNewState::Waiting(stream) => Ok(Async::Ready(stream)),
789 _ => panic!(),
790 }
791 }
792}
793
794#[cfg(all(unix, not(target_os = "fuchsia")))]
795mod sys {
796 use std::os::unix::prelude::*;
797 use super::{TcpStream, TcpListener};
798
799 impl AsRawFd for TcpStream {
800 fn as_raw_fd(&self) -> RawFd {
801 self.io.get_ref().as_raw_fd()
802 }
803 }
804
805 impl AsRawFd for TcpListener {
806 fn as_raw_fd(&self) -> RawFd {
807 self.io.get_ref().as_raw_fd()
808 }
809 }
810}
811
812#[cfg(windows)]
813mod sys {
814 // TODO: let's land these upstream with mio and then we can add them here.
815 //
816 // use std::os::windows::prelude::*;
817 // use super::{TcpStream, TcpListener};
818 //
819 // impl AsRawHandle for TcpStream {
820 // fn as_raw_handle(&self) -> RawHandle {
821 // self.io.get_ref().as_raw_handle()
822 // }
823 // }
824 //
825 // impl AsRawHandle for TcpListener {
826 // fn as_raw_handle(&self) -> RawHandle {
827 // self.listener.io().as_raw_handle()
828 // }
829 // }
830}