async_io/
lib.rs

1//! Async I/O and timers.
2//!
3//! This crate provides two tools:
4//!
5//! * [`Async`], an adapter for standard networking types (and [many other] types) to use in
6//!   async programs.
7//! * [`Timer`], a future or stream that emits timed events.
8//!
9//! For concrete async networking types built on top of this crate, see [`async-net`].
10//!
11//! [many other]: https://github.com/smol-rs/async-io/tree/master/examples
12//! [`async-net`]: https://docs.rs/async-net
13//!
14//! # Implementation
15//!
16//! The first time [`Async`] or [`Timer`] is used, a thread named "async-io" will be spawned.
17//! The purpose of this thread is to wait for I/O events reported by the operating system, and then
18//! wake appropriate futures blocked on I/O or timers when they can be resumed.
19//!
20//! To wait for the next I/O event, the "async-io" thread uses [epoll] on Linux/Android/illumos,
21//! [kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [IOCP] on Windows. That
22//! functionality is provided by the [`polling`] crate.
23//!
24//! However, note that you can also process I/O events and wake futures on any thread using the
25//! [`block_on()`] function. The "async-io" thread is therefore just a fallback mechanism
26//! processing I/O events in case no other threads are.
27//!
28//! [epoll]: https://en.wikipedia.org/wiki/Epoll
29//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
30//! [event ports]: https://illumos.org/man/port_create
31//! [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
32//! [`polling`]: https://docs.rs/polling
33//!
34//! # Examples
35//!
36//! Connect to `example.com:80`, or time out after 10 seconds.
37//!
38//! ```
39//! use async_io::{Async, Timer};
40//! use futures_lite::{future::FutureExt, io};
41//!
42//! use std::net::{TcpStream, ToSocketAddrs};
43//! use std::time::Duration;
44//!
45//! # futures_lite::future::block_on(async {
46//! let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
47//!
48//! let stream = Async::<TcpStream>::connect(addr).or(async {
49//!     Timer::after(Duration::from_secs(10)).await;
50//!     Err(io::ErrorKind::TimedOut.into())
51//! })
52//! .await?;
53//! # std::io::Result::Ok(()) });
54//! ```
55
56#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
57#![doc(
58    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
59)]
60#![doc(
61    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
62)]
63
64use std::future::Future;
65use std::io::{self, IoSlice, IoSliceMut, Read, Write};
66use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
67use std::pin::Pin;
68use std::sync::Arc;
69use std::task::{Context, Poll, Waker};
70use std::time::{Duration, Instant};
71
72#[cfg(unix)]
73use std::{
74    os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd},
75    os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
76    path::Path,
77};
78
79#[cfg(windows)]
80use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket};
81
82use futures_io::{AsyncRead, AsyncWrite};
83use futures_lite::stream::{self, Stream};
84use futures_lite::{future, pin, ready};
85
86use rustix::io as rio;
87use rustix::net as rn;
88
89use crate::reactor::{Reactor, Registration, Source};
90
91mod driver;
92mod reactor;
93
94pub mod os;
95
96pub use driver::block_on;
97pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
98
99/// A future or stream that emits timed events.
100///
101/// Timers are futures that output a single [`Instant`] when they fire.
102///
103/// Timers are also streams that can output [`Instant`]s periodically.
104///
105/// # Precision
106///
107/// There is a limit on the maximum precision that a `Timer` can provide. This limit is
108/// dependent on the current platform; for instance, on Windows, the maximum precision is
109/// about 16 milliseconds. Because of this limit, the timer may sleep for longer than the
110/// requested duration. It will never sleep for less.
111///
112/// # Examples
113///
114/// Sleep for 1 second:
115///
116/// ```
117/// use async_io::Timer;
118/// use std::time::Duration;
119///
120/// # futures_lite::future::block_on(async {
121/// Timer::after(Duration::from_secs(1)).await;
122/// # });
123/// ```
124///
125/// Timeout after 1 second:
126///
127/// ```
128/// use async_io::Timer;
129/// use futures_lite::FutureExt;
130/// use std::time::Duration;
131///
132/// # futures_lite::future::block_on(async {
133/// let addrs = async_net::resolve("google.com:80")
134///     .or(async {
135///         Timer::after(Duration::from_secs(1)).await;
136///         Err(std::io::ErrorKind::TimedOut.into())
137///     })
138///     .await?;
139/// # std::io::Result::Ok(()) });
140/// ```
141#[derive(Debug)]
142pub struct Timer {
143    /// This timer's ID and last waker that polled it.
144    ///
145    /// When this field is set to `None`, this timer is not registered in the reactor.
146    id_and_waker: Option<(usize, Waker)>,
147
148    /// The next instant at which this timer fires.
149    ///
150    /// If this timer is a blank timer, this value is None. If the timer
151    /// must be set, this value contains the next instant at which the
152    /// timer must fire.
153    when: Option<Instant>,
154
155    /// The period.
156    period: Duration,
157}
158
159impl Timer {
160    /// Creates a timer that will never fire.
161    ///
162    /// # Examples
163    ///
164    /// This function may also be useful for creating a function with an optional timeout.
165    ///
166    /// ```
167    /// # futures_lite::future::block_on(async {
168    /// use async_io::Timer;
169    /// use futures_lite::prelude::*;
170    /// use std::time::Duration;
171    ///
172    /// async fn run_with_timeout(timeout: Option<Duration>) {
173    ///     let timer = timeout
174    ///         .map(|timeout| Timer::after(timeout))
175    ///         .unwrap_or_else(Timer::never);
176    ///
177    ///     run_lengthy_operation().or(timer).await;
178    /// }
179    /// # // Note that since a Timer as a Future returns an Instant,
180    /// # // this function needs to return an Instant to be used
181    /// # // in "or".
182    /// # async fn run_lengthy_operation() -> std::time::Instant {
183    /// #    std::time::Instant::now()
184    /// # }
185    ///
186    /// // Times out after 5 seconds.
187    /// run_with_timeout(Some(Duration::from_secs(5))).await;
188    /// // Does not time out.
189    /// run_with_timeout(None).await;
190    /// # });
191    /// ```
192    pub fn never() -> Timer {
193        Timer {
194            id_and_waker: None,
195            when: None,
196            period: Duration::MAX,
197        }
198    }
199
200    /// Creates a timer that emits an event once after the given duration of time.
201    ///
202    /// # Examples
203    ///
204    /// ```
205    /// use async_io::Timer;
206    /// use std::time::Duration;
207    ///
208    /// # futures_lite::future::block_on(async {
209    /// Timer::after(Duration::from_secs(1)).await;
210    /// # });
211    /// ```
212    pub fn after(duration: Duration) -> Timer {
213        Instant::now()
214            .checked_add(duration)
215            .map_or_else(Timer::never, Timer::at)
216    }
217
218    /// Creates a timer that emits an event once at the given time instant.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// use async_io::Timer;
224    /// use std::time::{Duration, Instant};
225    ///
226    /// # futures_lite::future::block_on(async {
227    /// let now = Instant::now();
228    /// let when = now + Duration::from_secs(1);
229    /// Timer::at(when).await;
230    /// # });
231    /// ```
232    pub fn at(instant: Instant) -> Timer {
233        Timer::interval_at(instant, Duration::MAX)
234    }
235
236    /// Creates a timer that emits events periodically.
237    ///
238    /// # Examples
239    ///
240    /// ```
241    /// use async_io::Timer;
242    /// use futures_lite::StreamExt;
243    /// use std::time::{Duration, Instant};
244    ///
245    /// # futures_lite::future::block_on(async {
246    /// let period = Duration::from_secs(1);
247    /// Timer::interval(period).next().await;
248    /// # });
249    /// ```
250    pub fn interval(period: Duration) -> Timer {
251        Instant::now()
252            .checked_add(period)
253            .map_or_else(Timer::never, |at| Timer::interval_at(at, period))
254    }
255
256    /// Creates a timer that emits events periodically, starting at `start`.
257    ///
258    /// # Examples
259    ///
260    /// ```
261    /// use async_io::Timer;
262    /// use futures_lite::StreamExt;
263    /// use std::time::{Duration, Instant};
264    ///
265    /// # futures_lite::future::block_on(async {
266    /// let start = Instant::now();
267    /// let period = Duration::from_secs(1);
268    /// Timer::interval_at(start, period).next().await;
269    /// # });
270    /// ```
271    pub fn interval_at(start: Instant, period: Duration) -> Timer {
272        Timer {
273            id_and_waker: None,
274            when: Some(start),
275            period,
276        }
277    }
278
279    /// Indicates whether or not this timer will ever fire.
280    ///
281    /// [`never()`] will never fire, and timers created with [`after()`] or [`at()`] will fire
282    /// if the duration is not too large.
283    ///
284    /// [`never()`]: Timer::never()
285    /// [`after()`]: Timer::after()
286    /// [`at()`]: Timer::at()
287    ///
288    /// # Examples
289    ///
290    /// ```
291    /// # futures_lite::future::block_on(async {
292    /// use async_io::Timer;
293    /// use futures_lite::prelude::*;
294    /// use std::time::Duration;
295    ///
296    /// // `never` will never fire.
297    /// assert!(!Timer::never().will_fire());
298    ///
299    /// // `after` will fire if the duration is not too large.
300    /// assert!(Timer::after(Duration::from_secs(1)).will_fire());
301    /// assert!(!Timer::after(Duration::MAX).will_fire());
302    ///
303    /// // However, once an `after` timer has fired, it will never fire again.
304    /// let mut t = Timer::after(Duration::from_secs(1));
305    /// assert!(t.will_fire());
306    /// (&mut t).await;
307    /// assert!(!t.will_fire());
308    ///
309    /// // Interval timers will fire periodically.
310    /// let mut t = Timer::interval(Duration::from_secs(1));
311    /// assert!(t.will_fire());
312    /// t.next().await;
313    /// assert!(t.will_fire());
314    /// # });
315    /// ```
316    #[inline]
317    pub fn will_fire(&self) -> bool {
318        self.when.is_some()
319    }
320
321    /// Sets the timer to emit an event once after the given duration of time.
322    ///
323    /// Note that resetting a timer is different from creating a new timer because
324    /// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
325    /// that is polling the timer.
326    ///
327    /// # Examples
328    ///
329    /// ```
330    /// use async_io::Timer;
331    /// use std::time::Duration;
332    ///
333    /// # futures_lite::future::block_on(async {
334    /// let mut t = Timer::after(Duration::from_secs(1));
335    /// t.set_after(Duration::from_millis(100));
336    /// # });
337    /// ```
338    pub fn set_after(&mut self, duration: Duration) {
339        match Instant::now().checked_add(duration) {
340            Some(instant) => self.set_at(instant),
341            None => {
342                // Overflow to never going off.
343                self.clear();
344                self.when = None;
345            }
346        }
347    }
348
349    /// Sets the timer to emit an event once at the given time instant.
350    ///
351    /// Note that resetting a timer is different from creating a new timer because
352    /// [`set_at()`][`Timer::set_at()`] does not remove the waker associated with the task
353    /// that is polling the timer.
354    ///
355    /// # Examples
356    ///
357    /// ```
358    /// use async_io::Timer;
359    /// use std::time::{Duration, Instant};
360    ///
361    /// # futures_lite::future::block_on(async {
362    /// let mut t = Timer::after(Duration::from_secs(1));
363    ///
364    /// let now = Instant::now();
365    /// let when = now + Duration::from_secs(1);
366    /// t.set_at(when);
367    /// # });
368    /// ```
369    pub fn set_at(&mut self, instant: Instant) {
370        self.clear();
371
372        // Update the timeout.
373        self.when = Some(instant);
374
375        if let Some((id, waker)) = self.id_and_waker.as_mut() {
376            // Re-register the timer with the new timeout.
377            *id = Reactor::get().insert_timer(instant, waker);
378        }
379    }
380
381    /// Sets the timer to emit events periodically.
382    ///
383    /// Note that resetting a timer is different from creating a new timer because
384    /// [`set_interval()`][`Timer::set_interval()`] does not remove the waker associated with the
385    /// task that is polling the timer.
386    ///
387    /// # Examples
388    ///
389    /// ```
390    /// use async_io::Timer;
391    /// use futures_lite::StreamExt;
392    /// use std::time::{Duration, Instant};
393    ///
394    /// # futures_lite::future::block_on(async {
395    /// let mut t = Timer::after(Duration::from_secs(1));
396    ///
397    /// let period = Duration::from_secs(2);
398    /// t.set_interval(period);
399    /// # });
400    /// ```
401    pub fn set_interval(&mut self, period: Duration) {
402        match Instant::now().checked_add(period) {
403            Some(instant) => self.set_interval_at(instant, period),
404            None => {
405                // Overflow to never going off.
406                self.clear();
407                self.when = None;
408            }
409        }
410    }
411
412    /// Sets the timer to emit events periodically, starting at `start`.
413    ///
414    /// Note that resetting a timer is different from creating a new timer because
415    /// [`set_interval_at()`][`Timer::set_interval_at()`] does not remove the waker associated with
416    /// the task that is polling the timer.
417    ///
418    /// # Examples
419    ///
420    /// ```
421    /// use async_io::Timer;
422    /// use futures_lite::StreamExt;
423    /// use std::time::{Duration, Instant};
424    ///
425    /// # futures_lite::future::block_on(async {
426    /// let mut t = Timer::after(Duration::from_secs(1));
427    ///
428    /// let start = Instant::now();
429    /// let period = Duration::from_secs(2);
430    /// t.set_interval_at(start, period);
431    /// # });
432    /// ```
433    pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
434        self.clear();
435
436        self.when = Some(start);
437        self.period = period;
438
439        if let Some((id, waker)) = self.id_and_waker.as_mut() {
440            // Re-register the timer with the new timeout.
441            *id = Reactor::get().insert_timer(start, waker);
442        }
443    }
444
445    /// Helper function to clear the current timer.
446    fn clear(&mut self) {
447        if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
448            // Deregister the timer from the reactor.
449            Reactor::get().remove_timer(when, *id);
450        }
451    }
452}
453
454impl Drop for Timer {
455    fn drop(&mut self) {
456        if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.take()) {
457            // Deregister the timer from the reactor.
458            Reactor::get().remove_timer(when, id);
459        }
460    }
461}
462
463impl Future for Timer {
464    type Output = Instant;
465
466    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
467        match self.poll_next(cx) {
468            Poll::Ready(Some(when)) => Poll::Ready(when),
469            Poll::Pending => Poll::Pending,
470            Poll::Ready(None) => unreachable!(),
471        }
472    }
473}
474
475impl Stream for Timer {
476    type Item = Instant;
477
478    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
479        let this = self.get_mut();
480
481        if let Some(ref mut when) = this.when {
482            // Check if the timer has already fired.
483            if Instant::now() >= *when {
484                if let Some((id, _)) = this.id_and_waker.take() {
485                    // Deregister the timer from the reactor.
486                    Reactor::get().remove_timer(*when, id);
487                }
488                let result_time = *when;
489                if let Some(next) = (*when).checked_add(this.period) {
490                    *when = next;
491                    // Register the timer in the reactor.
492                    let id = Reactor::get().insert_timer(next, cx.waker());
493                    this.id_and_waker = Some((id, cx.waker().clone()));
494                } else {
495                    this.when = None;
496                }
497                return Poll::Ready(Some(result_time));
498            } else {
499                match &this.id_and_waker {
500                    None => {
501                        // Register the timer in the reactor.
502                        let id = Reactor::get().insert_timer(*when, cx.waker());
503                        this.id_and_waker = Some((id, cx.waker().clone()));
504                    }
505                    Some((id, w)) if !w.will_wake(cx.waker()) => {
506                        // Deregister the timer from the reactor to remove the old waker.
507                        Reactor::get().remove_timer(*when, *id);
508
509                        // Register the timer in the reactor with the new waker.
510                        let id = Reactor::get().insert_timer(*when, cx.waker());
511                        this.id_and_waker = Some((id, cx.waker().clone()));
512                    }
513                    Some(_) => {}
514                }
515            }
516        }
517
518        Poll::Pending
519    }
520}
521
522/// Async adapter for I/O types.
523///
524/// This type puts an I/O handle into non-blocking mode, registers it in
525/// [epoll]/[kqueue]/[event ports]/[IOCP], and then provides an async interface for it.
526///
527/// [epoll]: https://en.wikipedia.org/wiki/Epoll
528/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
529/// [event ports]: https://illumos.org/man/port_create
530/// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
531///
532/// # Caveats
533///
534/// [`Async`] is a low-level primitive, and as such it comes with some caveats.
535///
536/// For higher-level primitives built on top of [`Async`], look into [`async-net`] or
537/// [`async-process`] (on Unix).
538///
539/// The most notable caveat is that it is unsafe to access the inner I/O source mutably
540/// using this primitive. Traits likes [`AsyncRead`] and [`AsyncWrite`] are not implemented by
541/// default unless it is guaranteed that the resource won't be invalidated by reading or writing.
542/// See the [`IoSafe`] trait for more information.
543///
544/// [`async-net`]: https://github.com/smol-rs/async-net
545/// [`async-process`]: https://github.com/smol-rs/async-process
546/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html
547/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html
548///
549/// ### Supported types
550///
551/// [`Async`] supports all networking types, as well as some OS-specific file descriptors like
552/// [timerfd] and [inotify].
553///
554/// However, do not use [`Async`] with types like [`File`][`std::fs::File`],
555/// [`Stdin`][`std::io::Stdin`], [`Stdout`][`std::io::Stdout`], or [`Stderr`][`std::io::Stderr`]
556/// because all operating systems have issues with them when put in non-blocking mode.
557///
558/// [timerfd]: https://github.com/smol-rs/async-io/blob/master/examples/linux-timerfd.rs
559/// [inotify]: https://github.com/smol-rs/async-io/blob/master/examples/linux-inotify.rs
560///
561/// ### Concurrent I/O
562///
563/// Note that [`&Async<T>`][`Async`] implements [`AsyncRead`] and [`AsyncWrite`] if `&T`
564/// implements those traits, which means tasks can concurrently read and write using shared
565/// references.
566///
567/// But there is a catch: only one task can read a time, and only one task can write at a time. It
568/// is okay to have two tasks where one is reading and the other is writing at the same time, but
569/// it is not okay to have two tasks reading at the same time or writing at the same time. If you
570/// try to do that, conflicting tasks will just keep waking each other in turn, thus wasting CPU
571/// time.
572///
573/// Besides [`AsyncRead`] and [`AsyncWrite`], this caveat also applies to
574/// [`poll_readable()`][`Async::poll_readable()`] and
575/// [`poll_writable()`][`Async::poll_writable()`].
576///
577/// However, any number of tasks can be concurrently calling other methods like
578/// [`readable()`][`Async::readable()`] or [`read_with()`][`Async::read_with()`].
579///
580/// ### Closing
581///
582/// Closing the write side of [`Async`] with [`close()`][`futures_lite::AsyncWriteExt::close()`]
583/// simply flushes. If you want to shutdown a TCP or Unix socket, use
584/// [`Shutdown`][`std::net::Shutdown`].
585///
586/// # Examples
587///
588/// Connect to a server and echo incoming messages back to the server:
589///
590/// ```no_run
591/// use async_io::Async;
592/// use futures_lite::io;
593/// use std::net::TcpStream;
594///
595/// # futures_lite::future::block_on(async {
596/// // Connect to a local server.
597/// let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
598///
599/// // Echo all messages from the read side of the stream into the write side.
600/// io::copy(&stream, &stream).await?;
601/// # std::io::Result::Ok(()) });
602/// ```
603///
604/// You can use either predefined async methods or wrap blocking I/O operations in
605/// [`Async::read_with()`], [`Async::read_with_mut()`], [`Async::write_with()`], and
606/// [`Async::write_with_mut()`]:
607///
608/// ```no_run
609/// use async_io::Async;
610/// use std::net::TcpListener;
611///
612/// # futures_lite::future::block_on(async {
613/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
614///
615/// // These two lines are equivalent:
616/// let (stream, addr) = listener.accept().await?;
617/// let (stream, addr) = listener.read_with(|inner| inner.accept()).await?;
618/// # std::io::Result::Ok(()) });
619/// ```
620#[derive(Debug)]
621pub struct Async<T> {
622    /// A source registered in the reactor.
623    source: Arc<Source>,
624
625    /// The inner I/O handle.
626    io: Option<T>,
627}
628
629impl<T> Unpin for Async<T> {}
630
631#[cfg(unix)]
632impl<T: AsFd> Async<T> {
633    /// Creates an async I/O handle.
634    ///
635    /// This method will put the handle in non-blocking mode and register it in
636    /// [epoll]/[kqueue]/[event ports]/[IOCP].
637    ///
638    /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
639    /// `AsSocket`.
640    ///
641    /// [epoll]: https://en.wikipedia.org/wiki/Epoll
642    /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
643    /// [event ports]: https://illumos.org/man/port_create
644    /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
645    ///
646    /// # Examples
647    ///
648    /// ```
649    /// use async_io::Async;
650    /// use std::net::{SocketAddr, TcpListener};
651    ///
652    /// # futures_lite::future::block_on(async {
653    /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
654    /// let listener = Async::new(listener)?;
655    /// # std::io::Result::Ok(()) });
656    /// ```
657    pub fn new(io: T) -> io::Result<Async<T>> {
658        // Put the file descriptor in non-blocking mode.
659        set_nonblocking(io.as_fd())?;
660
661        Self::new_nonblocking(io)
662    }
663
664    /// Creates an async I/O handle without setting it to non-blocking mode.
665    ///
666    /// This method will register the handle in [epoll]/[kqueue]/[event ports]/[IOCP].
667    ///
668    /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
669    /// `AsSocket`.
670    ///
671    /// [epoll]: https://en.wikipedia.org/wiki/Epoll
672    /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
673    /// [event ports]: https://illumos.org/man/port_create
674    /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
675    ///
676    /// # Caveats
677    ///
678    /// The caller should ensure that the handle is set to non-blocking mode or that it is okay if
679    /// it is not set. If not set to non-blocking mode, I/O operations may block the current thread
680    /// and cause a deadlock in an asynchronous context.
681    pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
682        // SAFETY: It is impossible to drop the I/O source while it is registered through
683        // this type.
684        let registration = unsafe { Registration::new(io.as_fd()) };
685
686        Ok(Async {
687            source: Reactor::get().insert_io(registration)?,
688            io: Some(io),
689        })
690    }
691}
692
693#[cfg(unix)]
694impl<T: AsRawFd> AsRawFd for Async<T> {
695    fn as_raw_fd(&self) -> RawFd {
696        self.get_ref().as_raw_fd()
697    }
698}
699
700#[cfg(unix)]
701impl<T: AsFd> AsFd for Async<T> {
702    fn as_fd(&self) -> BorrowedFd<'_> {
703        self.get_ref().as_fd()
704    }
705}
706
707#[cfg(unix)]
708impl<T: AsFd + From<OwnedFd>> TryFrom<OwnedFd> for Async<T> {
709    type Error = io::Error;
710
711    fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
712        Async::new(value.into())
713    }
714}
715
716#[cfg(unix)]
717impl<T: Into<OwnedFd>> TryFrom<Async<T>> for OwnedFd {
718    type Error = io::Error;
719
720    fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
721        value.into_inner().map(Into::into)
722    }
723}
724
725#[cfg(windows)]
726impl<T: AsSocket> Async<T> {
727    /// Creates an async I/O handle.
728    ///
729    /// This method will put the handle in non-blocking mode and register it in
730    /// [epoll]/[kqueue]/[event ports]/[IOCP].
731    ///
732    /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
733    /// `AsSocket`.
734    ///
735    /// [epoll]: https://en.wikipedia.org/wiki/Epoll
736    /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
737    /// [event ports]: https://illumos.org/man/port_create
738    /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
739    ///
740    /// # Examples
741    ///
742    /// ```
743    /// use async_io::Async;
744    /// use std::net::{SocketAddr, TcpListener};
745    ///
746    /// # futures_lite::future::block_on(async {
747    /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
748    /// let listener = Async::new(listener)?;
749    /// # std::io::Result::Ok(()) });
750    /// ```
751    pub fn new(io: T) -> io::Result<Async<T>> {
752        // Put the socket in non-blocking mode.
753        set_nonblocking(io.as_socket())?;
754
755        Self::new_nonblocking(io)
756    }
757
758    /// Creates an async I/O handle without setting it to non-blocking mode.
759    ///
760    /// This method will register the handle in [epoll]/[kqueue]/[event ports]/[IOCP].
761    ///
762    /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
763    /// `AsSocket`.
764    ///
765    /// [epoll]: https://en.wikipedia.org/wiki/Epoll
766    /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
767    /// [event ports]: https://illumos.org/man/port_create
768    /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
769    ///
770    /// # Caveats
771    ///
772    /// The caller should ensure that the handle is set to non-blocking mode or that it is okay if
773    /// it is not set. If not set to non-blocking mode, I/O operations may block the current thread
774    /// and cause a deadlock in an asynchronous context.
775    pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
776        // Create the registration.
777        //
778        // SAFETY: It is impossible to drop the I/O source while it is registered through
779        // this type.
780        let registration = unsafe { Registration::new(io.as_socket()) };
781
782        Ok(Async {
783            source: Reactor::get().insert_io(registration)?,
784            io: Some(io),
785        })
786    }
787}
788
789#[cfg(windows)]
790impl<T: AsRawSocket> AsRawSocket for Async<T> {
791    fn as_raw_socket(&self) -> RawSocket {
792        self.get_ref().as_raw_socket()
793    }
794}
795
796#[cfg(windows)]
797impl<T: AsSocket> AsSocket for Async<T> {
798    fn as_socket(&self) -> BorrowedSocket<'_> {
799        self.get_ref().as_socket()
800    }
801}
802
803#[cfg(windows)]
804impl<T: AsSocket + From<OwnedSocket>> TryFrom<OwnedSocket> for Async<T> {
805    type Error = io::Error;
806
807    fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
808        Async::new(value.into())
809    }
810}
811
812#[cfg(windows)]
813impl<T: Into<OwnedSocket>> TryFrom<Async<T>> for OwnedSocket {
814    type Error = io::Error;
815
816    fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
817        value.into_inner().map(Into::into)
818    }
819}
820
821impl<T> Async<T> {
822    /// Gets a reference to the inner I/O handle.
823    ///
824    /// # Examples
825    ///
826    /// ```
827    /// use async_io::Async;
828    /// use std::net::TcpListener;
829    ///
830    /// # futures_lite::future::block_on(async {
831    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
832    /// let inner = listener.get_ref();
833    /// # std::io::Result::Ok(()) });
834    /// ```
835    pub fn get_ref(&self) -> &T {
836        self.io.as_ref().unwrap()
837    }
838
839    /// Gets a mutable reference to the inner I/O handle.
840    ///
841    /// # Safety
842    ///
843    /// The underlying I/O source must not be dropped using this function.
844    ///
845    /// # Examples
846    ///
847    /// ```
848    /// use async_io::Async;
849    /// use std::net::TcpListener;
850    ///
851    /// # futures_lite::future::block_on(async {
852    /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
853    /// let inner = unsafe { listener.get_mut() };
854    /// # std::io::Result::Ok(()) });
855    /// ```
856    pub unsafe fn get_mut(&mut self) -> &mut T {
857        self.io.as_mut().unwrap()
858    }
859
860    /// Unwraps the inner I/O handle.
861    ///
862    /// This method will **not** put the I/O handle back into blocking mode.
863    ///
864    /// # Examples
865    ///
866    /// ```
867    /// use async_io::Async;
868    /// use std::net::TcpListener;
869    ///
870    /// # futures_lite::future::block_on(async {
871    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
872    /// let inner = listener.into_inner()?;
873    ///
874    /// // Put the listener back into blocking mode.
875    /// inner.set_nonblocking(false)?;
876    /// # std::io::Result::Ok(()) });
877    /// ```
878    pub fn into_inner(mut self) -> io::Result<T> {
879        let io = self.io.take().unwrap();
880        Reactor::get().remove_io(&self.source)?;
881        Ok(io)
882    }
883
884    /// Waits until the I/O handle is readable.
885    ///
886    /// This method completes when a read operation on this I/O handle wouldn't block.
887    ///
888    /// # Examples
889    ///
890    /// ```no_run
891    /// use async_io::Async;
892    /// use std::net::TcpListener;
893    ///
894    /// # futures_lite::future::block_on(async {
895    /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
896    ///
897    /// // Wait until a client can be accepted.
898    /// listener.readable().await?;
899    /// # std::io::Result::Ok(()) });
900    /// ```
901    pub fn readable(&self) -> Readable<'_, T> {
902        Source::readable(self)
903    }
904
905    /// Waits until the I/O handle is readable.
906    ///
907    /// This method completes when a read operation on this I/O handle wouldn't block.
908    pub fn readable_owned(self: Arc<Self>) -> ReadableOwned<T> {
909        Source::readable_owned(self)
910    }
911
912    /// Waits until the I/O handle is writable.
913    ///
914    /// This method completes when a write operation on this I/O handle wouldn't block.
915    ///
916    /// # Examples
917    ///
918    /// ```
919    /// use async_io::Async;
920    /// use std::net::{TcpStream, ToSocketAddrs};
921    ///
922    /// # futures_lite::future::block_on(async {
923    /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
924    /// let stream = Async::<TcpStream>::connect(addr).await?;
925    ///
926    /// // Wait until the stream is writable.
927    /// stream.writable().await?;
928    /// # std::io::Result::Ok(()) });
929    /// ```
930    pub fn writable(&self) -> Writable<'_, T> {
931        Source::writable(self)
932    }
933
934    /// Waits until the I/O handle is writable.
935    ///
936    /// This method completes when a write operation on this I/O handle wouldn't block.
937    pub fn writable_owned(self: Arc<Self>) -> WritableOwned<T> {
938        Source::writable_owned(self)
939    }
940
941    /// Polls the I/O handle for readability.
942    ///
943    /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
944    /// indicating readability since the last time this task has called the method and received
945    /// [`Poll::Pending`].
946    ///
947    /// # Caveats
948    ///
949    /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
950    /// will just keep waking each other in turn, thus wasting CPU time.
951    ///
952    /// Note that the [`AsyncRead`] implementation for [`Async`] also uses this method.
953    ///
954    /// # Examples
955    ///
956    /// ```no_run
957    /// use async_io::Async;
958    /// use futures_lite::future;
959    /// use std::net::TcpListener;
960    ///
961    /// # futures_lite::future::block_on(async {
962    /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
963    ///
964    /// // Wait until a client can be accepted.
965    /// future::poll_fn(|cx| listener.poll_readable(cx)).await?;
966    /// # std::io::Result::Ok(()) });
967    /// ```
968    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
969        self.source.poll_readable(cx)
970    }
971
972    /// Polls the I/O handle for writability.
973    ///
974    /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
975    /// indicating writability since the last time this task has called the method and received
976    /// [`Poll::Pending`].
977    ///
978    /// # Caveats
979    ///
980    /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
981    /// will just keep waking each other in turn, thus wasting CPU time.
982    ///
983    /// Note that the [`AsyncWrite`] implementation for [`Async`] also uses this method.
984    ///
985    /// # Examples
986    ///
987    /// ```
988    /// use async_io::Async;
989    /// use futures_lite::future;
990    /// use std::net::{TcpStream, ToSocketAddrs};
991    ///
992    /// # futures_lite::future::block_on(async {
993    /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
994    /// let stream = Async::<TcpStream>::connect(addr).await?;
995    ///
996    /// // Wait until the stream is writable.
997    /// future::poll_fn(|cx| stream.poll_writable(cx)).await?;
998    /// # std::io::Result::Ok(()) });
999    /// ```
1000    pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1001        self.source.poll_writable(cx)
1002    }
1003
1004    /// Performs a read operation asynchronously.
1005    ///
1006    /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1007    /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1008    /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1009    /// sends a notification that the I/O handle is readable.
1010    ///
1011    /// The closure receives a shared reference to the I/O handle.
1012    ///
1013    /// # Examples
1014    ///
1015    /// ```no_run
1016    /// use async_io::Async;
1017    /// use std::net::TcpListener;
1018    ///
1019    /// # futures_lite::future::block_on(async {
1020    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1021    ///
1022    /// // Accept a new client asynchronously.
1023    /// let (stream, addr) = listener.read_with(|l| l.accept()).await?;
1024    /// # std::io::Result::Ok(()) });
1025    /// ```
1026    pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
1027        let mut op = op;
1028        loop {
1029            match op(self.get_ref()) {
1030                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1031                res => return res,
1032            }
1033            optimistic(self.readable()).await?;
1034        }
1035    }
1036
1037    /// Performs a read operation asynchronously.
1038    ///
1039    /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1040    /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1041    /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1042    /// sends a notification that the I/O handle is readable.
1043    ///
1044    /// The closure receives a mutable reference to the I/O handle.
1045    ///
1046    /// # Safety
1047    ///
1048    /// In the closure, the underlying I/O source must not be dropped.
1049    ///
1050    /// # Examples
1051    ///
1052    /// ```no_run
1053    /// use async_io::Async;
1054    /// use std::net::TcpListener;
1055    ///
1056    /// # futures_lite::future::block_on(async {
1057    /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1058    ///
1059    /// // Accept a new client asynchronously.
1060    /// let (stream, addr) = unsafe { listener.read_with_mut(|l| l.accept()).await? };
1061    /// # std::io::Result::Ok(()) });
1062    /// ```
1063    pub async unsafe fn read_with_mut<R>(
1064        &mut self,
1065        op: impl FnMut(&mut T) -> io::Result<R>,
1066    ) -> io::Result<R> {
1067        let mut op = op;
1068        loop {
1069            match op(self.get_mut()) {
1070                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1071                res => return res,
1072            }
1073            optimistic(self.readable()).await?;
1074        }
1075    }
1076
1077    /// Performs a write operation asynchronously.
1078    ///
1079    /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1080    /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1081    /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1082    /// sends a notification that the I/O handle is writable.
1083    ///
1084    /// The closure receives a shared reference to the I/O handle.
1085    ///
1086    /// # Examples
1087    ///
1088    /// ```no_run
1089    /// use async_io::Async;
1090    /// use std::net::UdpSocket;
1091    ///
1092    /// # futures_lite::future::block_on(async {
1093    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1094    /// socket.get_ref().connect("127.0.0.1:9000")?;
1095    ///
1096    /// let msg = b"hello";
1097    /// let len = socket.write_with(|s| s.send(msg)).await?;
1098    /// # std::io::Result::Ok(()) });
1099    /// ```
1100    pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
1101        let mut op = op;
1102        loop {
1103            match op(self.get_ref()) {
1104                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1105                res => return res,
1106            }
1107            optimistic(self.writable()).await?;
1108        }
1109    }
1110
1111    /// Performs a write operation asynchronously.
1112    ///
1113    /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1114    /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1115    /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1116    /// sends a notification that the I/O handle is writable.
1117    ///
1118    /// # Safety
1119    ///
1120    /// The closure receives a mutable reference to the I/O handle. In the closure, the underlying
1121    /// I/O source must not be dropped.
1122    ///
1123    /// # Examples
1124    ///
1125    /// ```no_run
1126    /// use async_io::Async;
1127    /// use std::net::UdpSocket;
1128    ///
1129    /// # futures_lite::future::block_on(async {
1130    /// let mut socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1131    /// socket.get_ref().connect("127.0.0.1:9000")?;
1132    ///
1133    /// let msg = b"hello";
1134    /// let len = unsafe { socket.write_with_mut(|s| s.send(msg)).await? };
1135    /// # std::io::Result::Ok(()) });
1136    /// ```
1137    pub async unsafe fn write_with_mut<R>(
1138        &mut self,
1139        op: impl FnMut(&mut T) -> io::Result<R>,
1140    ) -> io::Result<R> {
1141        let mut op = op;
1142        loop {
1143            match op(self.get_mut()) {
1144                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1145                res => return res,
1146            }
1147            optimistic(self.writable()).await?;
1148        }
1149    }
1150}
1151
1152impl<T> AsRef<T> for Async<T> {
1153    fn as_ref(&self) -> &T {
1154        self.get_ref()
1155    }
1156}
1157
1158impl<T> Drop for Async<T> {
1159    fn drop(&mut self) {
1160        if self.io.is_some() {
1161            // Deregister and ignore errors because destructors should not panic.
1162            Reactor::get().remove_io(&self.source).ok();
1163
1164            // Drop the I/O handle to close it.
1165            self.io.take();
1166        }
1167    }
1168}
1169
1170/// Types whose I/O trait implementations do not drop the underlying I/O source.
1171///
1172/// The resource contained inside of the [`Async`] cannot be invalidated. This invalidation can
1173/// happen if the inner resource (the [`TcpStream`], [`UnixListener`] or other `T`) is moved out
1174/// and dropped before the [`Async`]. Because of this, functions that grant mutable access to
1175/// the inner type are unsafe, as there is no way to guarantee that the source won't be dropped
1176/// and a dangling handle won't be left behind.
1177///
1178/// Unfortunately this extends to implementations of [`Read`] and [`Write`]. Since methods on those
1179/// traits take `&mut`, there is no guarantee that the implementor of those traits won't move the
1180/// source out while the method is being run.
1181///
1182/// This trait is an antidote to this predicament. By implementing this trait, the user pledges
1183/// that using any I/O traits won't destroy the source. This way, [`Async`] can implement the
1184/// `async` version of these I/O traits, like [`AsyncRead`] and [`AsyncWrite`].
1185///
1186/// # Safety
1187///
1188/// Any I/O trait implementations for this type must not drop the underlying I/O source. Traits
1189/// affected by this trait include [`Read`], [`Write`], [`Seek`] and [`BufRead`].
1190///
1191/// This trait is implemented by default on top of `libstd` types. In addition, it is implemented
1192/// for immutable reference types, as it is impossible to invalidate any outstanding references
1193/// while holding an immutable reference, even with interior mutability. As Rust's current pinning
1194/// system relies on similar guarantees, I believe that this approach is robust.
1195///
1196/// [`BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
1197/// [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
1198/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
1199/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
1200///
1201/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html
1202/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html
1203pub unsafe trait IoSafe {}
1204
1205/// Reference types can't be mutated.
1206///
1207/// The worst thing that can happen is that external state is used to change what kind of pointer
1208/// `as_fd()` returns. For instance:
1209///
1210/// ```
1211/// # #[cfg(unix)] {
1212/// use std::cell::Cell;
1213/// use std::net::TcpStream;
1214/// use std::os::unix::io::{AsFd, BorrowedFd};
1215///
1216/// struct Bar {
1217///     flag: Cell<bool>,
1218///     a: TcpStream,
1219///     b: TcpStream
1220/// }
1221///
1222/// impl AsFd for Bar {
1223///     fn as_fd(&self) -> BorrowedFd<'_> {
1224///         if self.flag.replace(!self.flag.get()) {
1225///             self.a.as_fd()
1226///         } else {
1227///             self.b.as_fd()
1228///         }
1229///     }
1230/// }
1231/// # }
1232/// ```
1233///
1234/// We solve this problem by only calling `as_fd()` once to get the original source. Implementations
1235/// like this are considered buggy (but not unsound) and are thus not really supported by `async-io`.
1236unsafe impl<T: ?Sized> IoSafe for &T {}
1237
1238// Can be implemented on top of libstd types.
1239unsafe impl IoSafe for std::fs::File {}
1240unsafe impl IoSafe for std::io::Stderr {}
1241unsafe impl IoSafe for std::io::Stdin {}
1242unsafe impl IoSafe for std::io::Stdout {}
1243unsafe impl IoSafe for std::io::StderrLock<'_> {}
1244unsafe impl IoSafe for std::io::StdinLock<'_> {}
1245unsafe impl IoSafe for std::io::StdoutLock<'_> {}
1246unsafe impl IoSafe for std::net::TcpStream {}
1247unsafe impl IoSafe for std::process::ChildStdin {}
1248unsafe impl IoSafe for std::process::ChildStdout {}
1249unsafe impl IoSafe for std::process::ChildStderr {}
1250
1251#[cfg(unix)]
1252unsafe impl IoSafe for std::os::unix::net::UnixStream {}
1253
1254unsafe impl<T: IoSafe + Read> IoSafe for std::io::BufReader<T> {}
1255unsafe impl<T: IoSafe + Write> IoSafe for std::io::BufWriter<T> {}
1256unsafe impl<T: IoSafe + Write> IoSafe for std::io::LineWriter<T> {}
1257unsafe impl<T: IoSafe + ?Sized> IoSafe for &mut T {}
1258unsafe impl<T: IoSafe + ?Sized> IoSafe for Box<T> {}
1259unsafe impl<T: Clone + IoSafe> IoSafe for std::borrow::Cow<'_, T> {}
1260
1261impl<T: IoSafe + Read> AsyncRead for Async<T> {
1262    fn poll_read(
1263        mut self: Pin<&mut Self>,
1264        cx: &mut Context<'_>,
1265        buf: &mut [u8],
1266    ) -> Poll<io::Result<usize>> {
1267        loop {
1268            match unsafe { (*self).get_mut() }.read(buf) {
1269                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1270                res => return Poll::Ready(res),
1271            }
1272            ready!(self.poll_readable(cx))?;
1273        }
1274    }
1275
1276    fn poll_read_vectored(
1277        mut self: Pin<&mut Self>,
1278        cx: &mut Context<'_>,
1279        bufs: &mut [IoSliceMut<'_>],
1280    ) -> Poll<io::Result<usize>> {
1281        loop {
1282            match unsafe { (*self).get_mut() }.read_vectored(bufs) {
1283                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1284                res => return Poll::Ready(res),
1285            }
1286            ready!(self.poll_readable(cx))?;
1287        }
1288    }
1289}
1290
1291// Since this is through a reference, we can't mutate the inner I/O source.
1292// Therefore this is safe!
1293impl<T> AsyncRead for &Async<T>
1294where
1295    for<'a> &'a T: Read,
1296{
1297    fn poll_read(
1298        self: Pin<&mut Self>,
1299        cx: &mut Context<'_>,
1300        buf: &mut [u8],
1301    ) -> Poll<io::Result<usize>> {
1302        loop {
1303            match (*self).get_ref().read(buf) {
1304                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1305                res => return Poll::Ready(res),
1306            }
1307            ready!(self.poll_readable(cx))?;
1308        }
1309    }
1310
1311    fn poll_read_vectored(
1312        self: Pin<&mut Self>,
1313        cx: &mut Context<'_>,
1314        bufs: &mut [IoSliceMut<'_>],
1315    ) -> Poll<io::Result<usize>> {
1316        loop {
1317            match (*self).get_ref().read_vectored(bufs) {
1318                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1319                res => return Poll::Ready(res),
1320            }
1321            ready!(self.poll_readable(cx))?;
1322        }
1323    }
1324}
1325
1326impl<T: IoSafe + Write> AsyncWrite for Async<T> {
1327    fn poll_write(
1328        mut self: Pin<&mut Self>,
1329        cx: &mut Context<'_>,
1330        buf: &[u8],
1331    ) -> Poll<io::Result<usize>> {
1332        loop {
1333            match unsafe { (*self).get_mut() }.write(buf) {
1334                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1335                res => return Poll::Ready(res),
1336            }
1337            ready!(self.poll_writable(cx))?;
1338        }
1339    }
1340
1341    fn poll_write_vectored(
1342        mut self: Pin<&mut Self>,
1343        cx: &mut Context<'_>,
1344        bufs: &[IoSlice<'_>],
1345    ) -> Poll<io::Result<usize>> {
1346        loop {
1347            match unsafe { (*self).get_mut() }.write_vectored(bufs) {
1348                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1349                res => return Poll::Ready(res),
1350            }
1351            ready!(self.poll_writable(cx))?;
1352        }
1353    }
1354
1355    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1356        loop {
1357            match unsafe { (*self).get_mut() }.flush() {
1358                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1359                res => return Poll::Ready(res),
1360            }
1361            ready!(self.poll_writable(cx))?;
1362        }
1363    }
1364
1365    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1366        self.poll_flush(cx)
1367    }
1368}
1369
1370impl<T> AsyncWrite for &Async<T>
1371where
1372    for<'a> &'a T: Write,
1373{
1374    fn poll_write(
1375        self: Pin<&mut Self>,
1376        cx: &mut Context<'_>,
1377        buf: &[u8],
1378    ) -> Poll<io::Result<usize>> {
1379        loop {
1380            match (*self).get_ref().write(buf) {
1381                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1382                res => return Poll::Ready(res),
1383            }
1384            ready!(self.poll_writable(cx))?;
1385        }
1386    }
1387
1388    fn poll_write_vectored(
1389        self: Pin<&mut Self>,
1390        cx: &mut Context<'_>,
1391        bufs: &[IoSlice<'_>],
1392    ) -> Poll<io::Result<usize>> {
1393        loop {
1394            match (*self).get_ref().write_vectored(bufs) {
1395                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1396                res => return Poll::Ready(res),
1397            }
1398            ready!(self.poll_writable(cx))?;
1399        }
1400    }
1401
1402    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1403        loop {
1404            match (*self).get_ref().flush() {
1405                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1406                res => return Poll::Ready(res),
1407            }
1408            ready!(self.poll_writable(cx))?;
1409        }
1410    }
1411
1412    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1413        self.poll_flush(cx)
1414    }
1415}
1416
1417impl Async<TcpListener> {
1418    /// Creates a TCP listener bound to the specified address.
1419    ///
1420    /// Binding with port number 0 will request an available port from the OS.
1421    ///
1422    /// # Examples
1423    ///
1424    /// ```
1425    /// use async_io::Async;
1426    /// use std::net::TcpListener;
1427    ///
1428    /// # futures_lite::future::block_on(async {
1429    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1430    /// println!("Listening on {}", listener.get_ref().local_addr()?);
1431    /// # std::io::Result::Ok(()) });
1432    /// ```
1433    pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> {
1434        let addr = addr.into();
1435        Async::new(TcpListener::bind(addr)?)
1436    }
1437
1438    /// Accepts a new incoming TCP connection.
1439    ///
1440    /// When a connection is established, it will be returned as a TCP stream together with its
1441    /// remote address.
1442    ///
1443    /// # Examples
1444    ///
1445    /// ```no_run
1446    /// use async_io::Async;
1447    /// use std::net::TcpListener;
1448    ///
1449    /// # futures_lite::future::block_on(async {
1450    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1451    /// let (stream, addr) = listener.accept().await?;
1452    /// println!("Accepted client: {}", addr);
1453    /// # std::io::Result::Ok(()) });
1454    /// ```
1455    pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
1456        let (stream, addr) = self.read_with(|io| io.accept()).await?;
1457        Ok((Async::new(stream)?, addr))
1458    }
1459
1460    /// Returns a stream of incoming TCP connections.
1461    ///
1462    /// The stream is infinite, i.e. it never stops with a [`None`].
1463    ///
1464    /// # Examples
1465    ///
1466    /// ```no_run
1467    /// use async_io::Async;
1468    /// use futures_lite::{pin, stream::StreamExt};
1469    /// use std::net::TcpListener;
1470    ///
1471    /// # futures_lite::future::block_on(async {
1472    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1473    /// let incoming = listener.incoming();
1474    /// pin!(incoming);
1475    ///
1476    /// while let Some(stream) = incoming.next().await {
1477    ///     let stream = stream?;
1478    ///     println!("Accepted client: {}", stream.get_ref().peer_addr()?);
1479    /// }
1480    /// # std::io::Result::Ok(()) });
1481    /// ```
1482    pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_ {
1483        stream::unfold(self, |listener| async move {
1484            let res = listener.accept().await.map(|(stream, _)| stream);
1485            Some((res, listener))
1486        })
1487    }
1488}
1489
1490impl TryFrom<std::net::TcpListener> for Async<std::net::TcpListener> {
1491    type Error = io::Error;
1492
1493    fn try_from(listener: std::net::TcpListener) -> io::Result<Self> {
1494        Async::new(listener)
1495    }
1496}
1497
1498impl Async<TcpStream> {
1499    /// Creates a TCP connection to the specified address.
1500    ///
1501    /// # Examples
1502    ///
1503    /// ```
1504    /// use async_io::Async;
1505    /// use std::net::{TcpStream, ToSocketAddrs};
1506    ///
1507    /// # futures_lite::future::block_on(async {
1508    /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1509    /// let stream = Async::<TcpStream>::connect(addr).await?;
1510    /// # std::io::Result::Ok(()) });
1511    /// ```
1512    pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>> {
1513        // Figure out how to handle this address.
1514        let addr = addr.into();
1515        let (domain, sock_addr) = match addr {
1516            SocketAddr::V4(v4) => (rn::AddressFamily::INET, rn::SocketAddrAny::V4(v4)),
1517            SocketAddr::V6(v6) => (rn::AddressFamily::INET6, rn::SocketAddrAny::V6(v6)),
1518        };
1519
1520        // Begin async connect.
1521        let socket = connect(sock_addr, domain, Some(rn::ipproto::TCP))?;
1522        // Use new_nonblocking because connect already sets socket to non-blocking mode.
1523        let stream = Async::new_nonblocking(TcpStream::from(socket))?;
1524
1525        // The stream becomes writable when connected.
1526        stream.writable().await?;
1527
1528        // Check if there was an error while connecting.
1529        match stream.get_ref().take_error()? {
1530            None => Ok(stream),
1531            Some(err) => Err(err),
1532        }
1533    }
1534
1535    /// Reads data from the stream without removing it from the buffer.
1536    ///
1537    /// Returns the number of bytes read. Successive calls of this method read the same data.
1538    ///
1539    /// # Examples
1540    ///
1541    /// ```
1542    /// use async_io::Async;
1543    /// use futures_lite::{io::AsyncWriteExt, stream::StreamExt};
1544    /// use std::net::{TcpStream, ToSocketAddrs};
1545    ///
1546    /// # futures_lite::future::block_on(async {
1547    /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1548    /// let mut stream = Async::<TcpStream>::connect(addr).await?;
1549    ///
1550    /// stream
1551    ///     .write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
1552    ///     .await?;
1553    ///
1554    /// let mut buf = [0u8; 1024];
1555    /// let len = stream.peek(&mut buf).await?;
1556    /// # std::io::Result::Ok(()) });
1557    /// ```
1558    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1559        self.read_with(|io| io.peek(buf)).await
1560    }
1561}
1562
1563impl TryFrom<std::net::TcpStream> for Async<std::net::TcpStream> {
1564    type Error = io::Error;
1565
1566    fn try_from(stream: std::net::TcpStream) -> io::Result<Self> {
1567        Async::new(stream)
1568    }
1569}
1570
1571impl Async<UdpSocket> {
1572    /// Creates a UDP socket bound to the specified address.
1573    ///
1574    /// Binding with port number 0 will request an available port from the OS.
1575    ///
1576    /// # Examples
1577    ///
1578    /// ```
1579    /// use async_io::Async;
1580    /// use std::net::UdpSocket;
1581    ///
1582    /// # futures_lite::future::block_on(async {
1583    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1584    /// println!("Bound to {}", socket.get_ref().local_addr()?);
1585    /// # std::io::Result::Ok(()) });
1586    /// ```
1587    pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
1588        let addr = addr.into();
1589        Async::new(UdpSocket::bind(addr)?)
1590    }
1591
1592    /// Receives a single datagram message.
1593    ///
1594    /// Returns the number of bytes read and the address the message came from.
1595    ///
1596    /// This method must be called with a valid byte slice of sufficient size to hold the message.
1597    /// If the message is too long to fit, excess bytes may get discarded.
1598    ///
1599    /// # Examples
1600    ///
1601    /// ```no_run
1602    /// use async_io::Async;
1603    /// use std::net::UdpSocket;
1604    ///
1605    /// # futures_lite::future::block_on(async {
1606    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1607    ///
1608    /// let mut buf = [0u8; 1024];
1609    /// let (len, addr) = socket.recv_from(&mut buf).await?;
1610    /// # std::io::Result::Ok(()) });
1611    /// ```
1612    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1613        self.read_with(|io| io.recv_from(buf)).await
1614    }
1615
1616    /// Receives a single datagram message without removing it from the queue.
1617    ///
1618    /// Returns the number of bytes read and the address the message came from.
1619    ///
1620    /// This method must be called with a valid byte slice of sufficient size to hold the message.
1621    /// If the message is too long to fit, excess bytes may get discarded.
1622    ///
1623    /// # Examples
1624    ///
1625    /// ```no_run
1626    /// use async_io::Async;
1627    /// use std::net::UdpSocket;
1628    ///
1629    /// # futures_lite::future::block_on(async {
1630    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1631    ///
1632    /// let mut buf = [0u8; 1024];
1633    /// let (len, addr) = socket.peek_from(&mut buf).await?;
1634    /// # std::io::Result::Ok(()) });
1635    /// ```
1636    pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1637        self.read_with(|io| io.peek_from(buf)).await
1638    }
1639
1640    /// Sends data to the specified address.
1641    ///
1642    /// Returns the number of bytes written.
1643    ///
1644    /// # Examples
1645    ///
1646    /// ```no_run
1647    /// use async_io::Async;
1648    /// use std::net::UdpSocket;
1649    ///
1650    /// # futures_lite::future::block_on(async {
1651    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1652    /// let addr = socket.get_ref().local_addr()?;
1653    ///
1654    /// let msg = b"hello";
1655    /// let len = socket.send_to(msg, addr).await?;
1656    /// # std::io::Result::Ok(()) });
1657    /// ```
1658    pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
1659        let addr = addr.into();
1660        self.write_with(|io| io.send_to(buf, addr)).await
1661    }
1662
1663    /// Receives a single datagram message from the connected peer.
1664    ///
1665    /// Returns the number of bytes read.
1666    ///
1667    /// This method must be called with a valid byte slice of sufficient size to hold the message.
1668    /// If the message is too long to fit, excess bytes may get discarded.
1669    ///
1670    /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1671    /// This method will fail if the socket is not connected.
1672    ///
1673    /// # Examples
1674    ///
1675    /// ```no_run
1676    /// use async_io::Async;
1677    /// use std::net::UdpSocket;
1678    ///
1679    /// # futures_lite::future::block_on(async {
1680    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1681    /// socket.get_ref().connect("127.0.0.1:9000")?;
1682    ///
1683    /// let mut buf = [0u8; 1024];
1684    /// let len = socket.recv(&mut buf).await?;
1685    /// # std::io::Result::Ok(()) });
1686    /// ```
1687    pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1688        self.read_with(|io| io.recv(buf)).await
1689    }
1690
1691    /// Receives a single datagram message from the connected peer without removing it from the
1692    /// queue.
1693    ///
1694    /// Returns the number of bytes read and the address the message came from.
1695    ///
1696    /// This method must be called with a valid byte slice of sufficient size to hold the message.
1697    /// If the message is too long to fit, excess bytes may get discarded.
1698    ///
1699    /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1700    /// This method will fail if the socket is not connected.
1701    ///
1702    /// # Examples
1703    ///
1704    /// ```no_run
1705    /// use async_io::Async;
1706    /// use std::net::UdpSocket;
1707    ///
1708    /// # futures_lite::future::block_on(async {
1709    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1710    /// socket.get_ref().connect("127.0.0.1:9000")?;
1711    ///
1712    /// let mut buf = [0u8; 1024];
1713    /// let len = socket.peek(&mut buf).await?;
1714    /// # std::io::Result::Ok(()) });
1715    /// ```
1716    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1717        self.read_with(|io| io.peek(buf)).await
1718    }
1719
1720    /// Sends data to the connected peer.
1721    ///
1722    /// Returns the number of bytes written.
1723    ///
1724    /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1725    /// This method will fail if the socket is not connected.
1726    ///
1727    /// # Examples
1728    ///
1729    /// ```no_run
1730    /// use async_io::Async;
1731    /// use std::net::UdpSocket;
1732    ///
1733    /// # futures_lite::future::block_on(async {
1734    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1735    /// socket.get_ref().connect("127.0.0.1:9000")?;
1736    ///
1737    /// let msg = b"hello";
1738    /// let len = socket.send(msg).await?;
1739    /// # std::io::Result::Ok(()) });
1740    /// ```
1741    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
1742        self.write_with(|io| io.send(buf)).await
1743    }
1744}
1745
1746impl TryFrom<std::net::UdpSocket> for Async<std::net::UdpSocket> {
1747    type Error = io::Error;
1748
1749    fn try_from(socket: std::net::UdpSocket) -> io::Result<Self> {
1750        Async::new(socket)
1751    }
1752}
1753
1754#[cfg(unix)]
1755impl Async<UnixListener> {
1756    /// Creates a UDS listener bound to the specified path.
1757    ///
1758    /// # Examples
1759    ///
1760    /// ```no_run
1761    /// use async_io::Async;
1762    /// use std::os::unix::net::UnixListener;
1763    ///
1764    /// # futures_lite::future::block_on(async {
1765    /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1766    /// println!("Listening on {:?}", listener.get_ref().local_addr()?);
1767    /// # std::io::Result::Ok(()) });
1768    /// ```
1769    pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
1770        let path = path.as_ref().to_owned();
1771        Async::new(UnixListener::bind(path)?)
1772    }
1773
1774    /// Accepts a new incoming UDS stream connection.
1775    ///
1776    /// When a connection is established, it will be returned as a stream together with its remote
1777    /// address.
1778    ///
1779    /// # Examples
1780    ///
1781    /// ```no_run
1782    /// use async_io::Async;
1783    /// use std::os::unix::net::UnixListener;
1784    ///
1785    /// # futures_lite::future::block_on(async {
1786    /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1787    /// let (stream, addr) = listener.accept().await?;
1788    /// println!("Accepted client: {:?}", addr);
1789    /// # std::io::Result::Ok(()) });
1790    /// ```
1791    pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
1792        let (stream, addr) = self.read_with(|io| io.accept()).await?;
1793        Ok((Async::new(stream)?, addr))
1794    }
1795
1796    /// Returns a stream of incoming UDS connections.
1797    ///
1798    /// The stream is infinite, i.e. it never stops with a [`None`] item.
1799    ///
1800    /// # Examples
1801    ///
1802    /// ```no_run
1803    /// use async_io::Async;
1804    /// use futures_lite::{pin, stream::StreamExt};
1805    /// use std::os::unix::net::UnixListener;
1806    ///
1807    /// # futures_lite::future::block_on(async {
1808    /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1809    /// let incoming = listener.incoming();
1810    /// pin!(incoming);
1811    ///
1812    /// while let Some(stream) = incoming.next().await {
1813    ///     let stream = stream?;
1814    ///     println!("Accepted client: {:?}", stream.get_ref().peer_addr()?);
1815    /// }
1816    /// # std::io::Result::Ok(()) });
1817    /// ```
1818    pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_ {
1819        stream::unfold(self, |listener| async move {
1820            let res = listener.accept().await.map(|(stream, _)| stream);
1821            Some((res, listener))
1822        })
1823    }
1824}
1825
1826#[cfg(unix)]
1827impl TryFrom<std::os::unix::net::UnixListener> for Async<std::os::unix::net::UnixListener> {
1828    type Error = io::Error;
1829
1830    fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
1831        Async::new(listener)
1832    }
1833}
1834
1835#[cfg(unix)]
1836impl Async<UnixStream> {
1837    /// Creates a UDS stream connected to the specified path.
1838    ///
1839    /// # Examples
1840    ///
1841    /// ```no_run
1842    /// use async_io::Async;
1843    /// use std::os::unix::net::UnixStream;
1844    ///
1845    /// # futures_lite::future::block_on(async {
1846    /// let stream = Async::<UnixStream>::connect("/tmp/socket").await?;
1847    /// # std::io::Result::Ok(()) });
1848    /// ```
1849    pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
1850        let address = convert_path_to_socket_address(path.as_ref())?;
1851
1852        // Begin async connect.
1853        let socket = connect(address.into(), rn::AddressFamily::UNIX, None)?;
1854        // Use new_nonblocking because connect already sets socket to non-blocking mode.
1855        let stream = Async::new_nonblocking(UnixStream::from(socket))?;
1856
1857        // The stream becomes writable when connected.
1858        stream.writable().await?;
1859
1860        // On Linux, it appears the socket may become writable even when connecting fails, so we
1861        // must do an extra check here and see if the peer address is retrievable.
1862        stream.get_ref().peer_addr()?;
1863        Ok(stream)
1864    }
1865
1866    /// Creates an unnamed pair of connected UDS stream sockets.
1867    ///
1868    /// # Examples
1869    ///
1870    /// ```no_run
1871    /// use async_io::Async;
1872    /// use std::os::unix::net::UnixStream;
1873    ///
1874    /// # futures_lite::future::block_on(async {
1875    /// let (stream1, stream2) = Async::<UnixStream>::pair()?;
1876    /// # std::io::Result::Ok(()) });
1877    /// ```
1878    pub fn pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)> {
1879        let (stream1, stream2) = UnixStream::pair()?;
1880        Ok((Async::new(stream1)?, Async::new(stream2)?))
1881    }
1882}
1883
1884#[cfg(unix)]
1885impl TryFrom<std::os::unix::net::UnixStream> for Async<std::os::unix::net::UnixStream> {
1886    type Error = io::Error;
1887
1888    fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
1889        Async::new(stream)
1890    }
1891}
1892
1893#[cfg(unix)]
1894impl Async<UnixDatagram> {
1895    /// Creates a UDS datagram socket bound to the specified path.
1896    ///
1897    /// # Examples
1898    ///
1899    /// ```no_run
1900    /// use async_io::Async;
1901    /// use std::os::unix::net::UnixDatagram;
1902    ///
1903    /// # futures_lite::future::block_on(async {
1904    /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1905    /// # std::io::Result::Ok(()) });
1906    /// ```
1907    pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
1908        let path = path.as_ref().to_owned();
1909        Async::new(UnixDatagram::bind(path)?)
1910    }
1911
1912    /// Creates a UDS datagram socket not bound to any address.
1913    ///
1914    /// # Examples
1915    ///
1916    /// ```no_run
1917    /// use async_io::Async;
1918    /// use std::os::unix::net::UnixDatagram;
1919    ///
1920    /// # futures_lite::future::block_on(async {
1921    /// let socket = Async::<UnixDatagram>::unbound()?;
1922    /// # std::io::Result::Ok(()) });
1923    /// ```
1924    pub fn unbound() -> io::Result<Async<UnixDatagram>> {
1925        Async::new(UnixDatagram::unbound()?)
1926    }
1927
1928    /// Creates an unnamed pair of connected Unix datagram sockets.
1929    ///
1930    /// # Examples
1931    ///
1932    /// ```no_run
1933    /// use async_io::Async;
1934    /// use std::os::unix::net::UnixDatagram;
1935    ///
1936    /// # futures_lite::future::block_on(async {
1937    /// let (socket1, socket2) = Async::<UnixDatagram>::pair()?;
1938    /// # std::io::Result::Ok(()) });
1939    /// ```
1940    pub fn pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)> {
1941        let (socket1, socket2) = UnixDatagram::pair()?;
1942        Ok((Async::new(socket1)?, Async::new(socket2)?))
1943    }
1944
1945    /// Receives data from the socket.
1946    ///
1947    /// Returns the number of bytes read and the address the message came from.
1948    ///
1949    /// # Examples
1950    ///
1951    /// ```no_run
1952    /// use async_io::Async;
1953    /// use std::os::unix::net::UnixDatagram;
1954    ///
1955    /// # futures_lite::future::block_on(async {
1956    /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1957    ///
1958    /// let mut buf = [0u8; 1024];
1959    /// let (len, addr) = socket.recv_from(&mut buf).await?;
1960    /// # std::io::Result::Ok(()) });
1961    /// ```
1962    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
1963        self.read_with(|io| io.recv_from(buf)).await
1964    }
1965
1966    /// Sends data to the specified address.
1967    ///
1968    /// Returns the number of bytes written.
1969    ///
1970    /// # Examples
1971    ///
1972    /// ```no_run
1973    /// use async_io::Async;
1974    /// use std::os::unix::net::UnixDatagram;
1975    ///
1976    /// # futures_lite::future::block_on(async {
1977    /// let socket = Async::<UnixDatagram>::unbound()?;
1978    ///
1979    /// let msg = b"hello";
1980    /// let addr = "/tmp/socket";
1981    /// let len = socket.send_to(msg, addr).await?;
1982    /// # std::io::Result::Ok(()) });
1983    /// ```
1984    pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
1985        self.write_with(|io| io.send_to(buf, &path)).await
1986    }
1987
1988    /// Receives data from the connected peer.
1989    ///
1990    /// Returns the number of bytes read and the address the message came from.
1991    ///
1992    /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
1993    /// This method will fail if the socket is not connected.
1994    ///
1995    /// # Examples
1996    ///
1997    /// ```no_run
1998    /// use async_io::Async;
1999    /// use std::os::unix::net::UnixDatagram;
2000    ///
2001    /// # futures_lite::future::block_on(async {
2002    /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
2003    /// socket.get_ref().connect("/tmp/socket2")?;
2004    ///
2005    /// let mut buf = [0u8; 1024];
2006    /// let len = socket.recv(&mut buf).await?;
2007    /// # std::io::Result::Ok(()) });
2008    /// ```
2009    pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
2010        self.read_with(|io| io.recv(buf)).await
2011    }
2012
2013    /// Sends data to the connected peer.
2014    ///
2015    /// Returns the number of bytes written.
2016    ///
2017    /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
2018    /// This method will fail if the socket is not connected.
2019    ///
2020    /// # Examples
2021    ///
2022    /// ```no_run
2023    /// use async_io::Async;
2024    /// use std::os::unix::net::UnixDatagram;
2025    ///
2026    /// # futures_lite::future::block_on(async {
2027    /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
2028    /// socket.get_ref().connect("/tmp/socket2")?;
2029    ///
2030    /// let msg = b"hello";
2031    /// let len = socket.send(msg).await?;
2032    /// # std::io::Result::Ok(()) });
2033    /// ```
2034    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
2035        self.write_with(|io| io.send(buf)).await
2036    }
2037}
2038
2039#[cfg(unix)]
2040impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::UnixDatagram> {
2041    type Error = io::Error;
2042
2043    fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self> {
2044        Async::new(socket)
2045    }
2046}
2047
2048/// Polls a future once, waits for a wakeup, and then optimistically assumes the future is ready.
2049async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
2050    let mut polled = false;
2051    pin!(fut);
2052
2053    future::poll_fn(|cx| {
2054        if !polled {
2055            polled = true;
2056            fut.as_mut().poll(cx)
2057        } else {
2058            Poll::Ready(Ok(()))
2059        }
2060    })
2061    .await
2062}
2063
2064fn connect(
2065    addr: rn::SocketAddrAny,
2066    domain: rn::AddressFamily,
2067    protocol: Option<rn::Protocol>,
2068) -> io::Result<rustix::fd::OwnedFd> {
2069    #[cfg(windows)]
2070    use rustix::fd::AsFd;
2071
2072    setup_networking();
2073
2074    #[cfg(any(
2075        target_os = "android",
2076        target_os = "dragonfly",
2077        target_os = "freebsd",
2078        target_os = "fuchsia",
2079        target_os = "illumos",
2080        target_os = "linux",
2081        target_os = "netbsd",
2082        target_os = "openbsd"
2083    ))]
2084    let socket = rn::socket_with(
2085        domain,
2086        rn::SocketType::STREAM,
2087        rn::SocketFlags::CLOEXEC | rn::SocketFlags::NONBLOCK,
2088        protocol,
2089    )?;
2090
2091    #[cfg(not(any(
2092        target_os = "android",
2093        target_os = "dragonfly",
2094        target_os = "freebsd",
2095        target_os = "fuchsia",
2096        target_os = "illumos",
2097        target_os = "linux",
2098        target_os = "netbsd",
2099        target_os = "openbsd"
2100    )))]
2101    let socket = {
2102        #[cfg(not(any(
2103            target_os = "aix",
2104            target_vendor = "apple",
2105            target_os = "espidf",
2106            windows,
2107        )))]
2108        let flags = rn::SocketFlags::CLOEXEC;
2109        #[cfg(any(
2110            target_os = "aix",
2111            target_vendor = "apple",
2112            target_os = "espidf",
2113            windows,
2114        ))]
2115        let flags = rn::SocketFlags::empty();
2116
2117        // Create the socket.
2118        let socket = rn::socket_with(domain, rn::SocketType::STREAM, flags, protocol)?;
2119
2120        // Set cloexec if necessary.
2121        #[cfg(any(target_os = "aix", target_vendor = "apple"))]
2122        rio::fcntl_setfd(&socket, rio::fcntl_getfd(&socket)? | rio::FdFlags::CLOEXEC)?;
2123
2124        // Set non-blocking mode.
2125        set_nonblocking(socket.as_fd())?;
2126
2127        socket
2128    };
2129
2130    // Set nosigpipe if necessary.
2131    #[cfg(any(
2132        target_vendor = "apple",
2133        target_os = "freebsd",
2134        target_os = "netbsd",
2135        target_os = "dragonfly",
2136    ))]
2137    rn::sockopt::set_socket_nosigpipe(&socket, true)?;
2138
2139    // Set the handle information to HANDLE_FLAG_INHERIT.
2140    #[cfg(windows)]
2141    unsafe {
2142        if windows_sys::Win32::Foundation::SetHandleInformation(
2143            socket.as_raw_socket() as _,
2144            windows_sys::Win32::Foundation::HANDLE_FLAG_INHERIT,
2145            windows_sys::Win32::Foundation::HANDLE_FLAG_INHERIT,
2146        ) == 0
2147        {
2148            return Err(io::Error::last_os_error());
2149        }
2150    }
2151
2152    #[allow(unreachable_patterns)]
2153    match rn::connect_any(&socket, &addr) {
2154        Ok(_) => {}
2155        #[cfg(unix)]
2156        Err(rio::Errno::INPROGRESS) => {}
2157        Err(rio::Errno::AGAIN) | Err(rio::Errno::WOULDBLOCK) => {}
2158        Err(err) => return Err(err.into()),
2159    }
2160    Ok(socket)
2161}
2162
2163#[inline]
2164fn setup_networking() {
2165    #[cfg(windows)]
2166    {
2167        // On Windows, we need to call WSAStartup before calling any networking code.
2168        // Make sure to call it at least once.
2169        static INIT: std::sync::Once = std::sync::Once::new();
2170
2171        INIT.call_once(|| {
2172            let _ = rustix::net::wsa_startup();
2173        });
2174    }
2175}
2176
2177#[inline]
2178fn set_nonblocking(
2179    #[cfg(unix)] fd: BorrowedFd<'_>,
2180    #[cfg(windows)] fd: BorrowedSocket<'_>,
2181) -> io::Result<()> {
2182    cfg_if::cfg_if! {
2183        // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
2184        // for now, as with the standard library, because it seems to behave
2185        // differently depending on the platform.
2186        // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
2187        // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
2188        // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
2189        if #[cfg(any(windows, target_os = "linux"))] {
2190            rustix::io::ioctl_fionbio(fd, true)?;
2191        } else {
2192            let previous = rustix::fs::fcntl_getfl(fd)?;
2193            let new = previous | rustix::fs::OFlags::NONBLOCK;
2194            if new != previous {
2195                rustix::fs::fcntl_setfl(fd, new)?;
2196            }
2197        }
2198    }
2199
2200    Ok(())
2201}
2202
2203/// Converts a `Path` to its socket address representation.
2204///
2205/// This function is abstract socket-aware.
2206#[cfg(unix)]
2207#[inline]
2208fn convert_path_to_socket_address(path: &Path) -> io::Result<rn::SocketAddrUnix> {
2209    // SocketAddrUnix::new() will throw EINVAL when a path with a zero in it is passed in.
2210    // However, some users expect to be able to pass in paths to abstract sockets, which
2211    // triggers this error as it has a zero in it. Therefore, if a path starts with a zero,
2212    // make it an abstract socket.
2213    #[cfg(any(target_os = "linux", target_os = "android"))]
2214    let address = {
2215        use std::os::unix::ffi::OsStrExt;
2216
2217        let path = path.as_os_str();
2218        match path.as_bytes().first() {
2219            Some(0) => rn::SocketAddrUnix::new_abstract_name(path.as_bytes().get(1..).unwrap())?,
2220            _ => rn::SocketAddrUnix::new(path)?,
2221        }
2222    };
2223
2224    // Only Linux and Android support abstract sockets.
2225    #[cfg(not(any(target_os = "linux", target_os = "android")))]
2226    let address = rn::SocketAddrUnix::new(path)?;
2227
2228    Ok(address)
2229}