async_io/os/
kqueue.rs

1//! Functionality that is only available for `kqueue`-based platforms.
2
3use __private::QueueableSealed;
4
5use crate::reactor::{Reactor, Readable, Registration};
6use crate::Async;
7
8use std::future::Future;
9use std::io::{Error, Result};
10use std::num::NonZeroI32;
11use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
12use std::pin::Pin;
13use std::process::Child;
14use std::task::{Context, Poll};
15
16/// A wrapper around a queueable object that waits until it is ready.
17///
18/// The underlying `kqueue` implementation can be used to poll for events besides file descriptor
19/// read/write readiness. This API makes these faculties available to the user.
20///
21/// See the [`Queueable`] trait and its implementors for objects that currently support being registered
22/// into the reactor.
23#[derive(Debug)]
24pub struct Filter<T>(Async<T>);
25
26impl<T> AsRef<T> for Filter<T> {
27    fn as_ref(&self) -> &T {
28        self.0.as_ref()
29    }
30}
31
32impl<T> AsMut<T> for Filter<T> {
33    fn as_mut(&mut self) -> &mut T {
34        self.get_mut()
35    }
36}
37
38impl<T: Queueable> Filter<T> {
39    /// Create a new [`Filter`] around a [`Queueable`].
40    ///
41    /// # Examples
42    ///
43    /// ```no_run
44    /// use std::process::Command;
45    /// use async_io::os::kqueue::{Exit, Filter};
46    ///
47    /// // Create a new process to wait for.
48    /// let mut child = Command::new("sleep").arg("5").spawn().unwrap();
49    ///
50    /// // Wrap the process in an `Async` object that waits for it to exit.
51    /// let process = Filter::new(Exit::new(child)).unwrap();
52    ///
53    /// // Wait for the process to exit.
54    /// # async_io::block_on(async {
55    /// process.ready().await.unwrap();
56    /// # });
57    /// ```
58    pub fn new(mut filter: T) -> Result<Self> {
59        Ok(Self(Async {
60            source: Reactor::get().insert_io(filter.registration())?,
61            io: Some(filter),
62        }))
63    }
64}
65
66impl<T: AsRawFd> AsRawFd for Filter<T> {
67    fn as_raw_fd(&self) -> RawFd {
68        self.0.as_raw_fd()
69    }
70}
71
72impl<T: AsFd> AsFd for Filter<T> {
73    fn as_fd(&self) -> BorrowedFd<'_> {
74        self.0.as_fd()
75    }
76}
77
78impl<T: AsFd + From<OwnedFd>> TryFrom<OwnedFd> for Filter<T> {
79    type Error = Error;
80
81    fn try_from(fd: OwnedFd) -> Result<Self> {
82        Ok(Self(Async::try_from(fd)?))
83    }
84}
85
86impl<T: Into<OwnedFd>> TryFrom<Filter<T>> for OwnedFd {
87    type Error = Error;
88
89    fn try_from(filter: Filter<T>) -> Result<Self> {
90        filter.0.try_into()
91    }
92}
93
94impl<T> Filter<T> {
95    /// Gets a reference to the underlying [`Queueable`] object.
96    ///
97    /// # Examples
98    ///
99    /// ```
100    /// use async_io::os::kqueue::{Exit, Filter};
101    ///
102    /// # futures_lite::future::block_on(async {
103    /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
104    /// let process = Filter::new(Exit::new(child)).unwrap();
105    /// let inner = process.get_ref();
106    /// # });
107    /// ```
108    pub fn get_ref(&self) -> &T {
109        self.0.get_ref()
110    }
111
112    /// Gets a mutable reference to the underlying [`Queueable`] object.
113    ///
114    /// Unlike in [`Async`], this method is safe to call, since dropping the [`Filter`] will
115    /// not cause any undefined behavior.
116    ///
117    /// # Examples
118    ///
119    /// ```
120    /// use async_io::os::kqueue::{Exit, Filter};
121    ///
122    /// # futures_lite::future::block_on(async {
123    /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
124    /// let mut process = Filter::new(Exit::new(child)).unwrap();
125    /// let inner = process.get_mut();
126    /// # });
127    /// ```
128    pub fn get_mut(&mut self) -> &mut T {
129        unsafe { self.0.get_mut() }
130    }
131
132    /// Unwraps the inner [`Queueable`] object.
133    ///
134    /// # Examples
135    ///
136    /// ```
137    /// use async_io::os::kqueue::{Exit, Filter};
138    ///
139    /// # futures_lite::future::block_on(async {
140    /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
141    /// let process = Filter::new(Exit::new(child)).unwrap();
142    /// let inner = process.into_inner().unwrap();
143    /// # });
144    /// ```
145    pub fn into_inner(self) -> Result<T> {
146        self.0.into_inner()
147    }
148
149    /// Waits until the [`Queueable`] object is ready.
150    ///
151    /// This method completes when the underlying [`Queueable`] object has completed. See the documentation
152    /// for the [`Queueable`] object for more information.
153    ///
154    /// # Examples
155    ///
156    /// ```no_run
157    /// use std::process::Command;
158    /// use async_io::os::kqueue::{Exit, Filter};
159    ///
160    /// # futures_lite::future::block_on(async {
161    /// let child = Command::new("sleep").arg("5").spawn()?;
162    /// let process = Filter::new(Exit::new(child))?;
163    ///
164    /// // Wait for the process to exit.
165    /// process.ready().await?;
166    /// # std::io::Result::Ok(()) });
167    /// ```
168    pub fn ready(&self) -> Ready<'_, T> {
169        Ready(self.0.readable())
170    }
171
172    /// Polls the I/O handle for readiness.
173    ///
174    /// When this method returns [`Poll::Ready`], that means that the OS has delivered a notification
175    /// that the underlying [`Queueable`] object is ready. See the documentation for the [`Queueable`]
176    /// object for more information.
177    ///
178    /// # Caveats
179    ///
180    /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
181    /// will just keep waking each other in turn, thus wasting CPU time.
182    ///
183    /// # Examples
184    ///
185    /// ```no_run
186    /// use std::process::Command;
187    /// use async_io::os::kqueue::{Exit, Filter};
188    /// use futures_lite::future;
189    ///
190    /// # futures_lite::future::block_on(async {
191    /// let child = Command::new("sleep").arg("5").spawn()?;
192    /// let process = Filter::new(Exit::new(child))?;
193    ///
194    /// // Wait for the process to exit.
195    /// future::poll_fn(|cx| process.poll_ready(cx)).await?;
196    /// # std::io::Result::Ok(()) });
197    /// ```
198    pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
199        self.0.poll_readable(cx)
200    }
201}
202
203/// Future for [`Filter::ready`].
204#[must_use = "futures do nothing unless you `.await` or poll them"]
205#[derive(Debug)]
206pub struct Ready<'a, T>(Readable<'a, T>);
207
208impl<T> Future for Ready<'_, T> {
209    type Output = Result<()>;
210
211    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
212        Pin::new(&mut self.0).poll(cx)
213    }
214}
215
216/// Objects that can be registered into the reactor via a [`Async`](crate::Async).
217///
218/// These objects represent other filters associated with the `kqueue` runtime aside from readability
219/// and writability. Rather than waiting on readable/writable, they wait on "readiness". This is
220/// typically used for signals and child process exits.
221pub trait Queueable: QueueableSealed {}
222
223/// An object representing a signal.
224///
225/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
226/// it will return a [`readable`](crate::Async::readable) event when the signal is received.
227#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
228pub struct Signal(pub i32);
229
230impl QueueableSealed for Signal {
231    fn registration(&mut self) -> Registration {
232        Registration::Signal(*self)
233    }
234}
235impl Queueable for Signal {}
236
237/// Wait for a child process to exit.
238///
239/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
240/// it will return a [`readable`](crate::Async::readable) event when the child process exits.
241#[derive(Debug)]
242pub struct Exit(NonZeroI32);
243
244impl Exit {
245    /// Create a new `Exit` object.
246    pub fn new(child: Child) -> Self {
247        Self(
248            NonZeroI32::new(child.id().try_into().expect("unable to parse pid"))
249                .expect("cannot register pid with zero value"),
250        )
251    }
252
253    /// Create a new `Exit` object from a PID.
254    ///
255    /// # Safety
256    ///
257    /// The PID must be tied to an actual child process.
258    pub unsafe fn from_pid(pid: NonZeroI32) -> Self {
259        Self(pid)
260    }
261}
262
263impl QueueableSealed for Exit {
264    fn registration(&mut self) -> Registration {
265        Registration::Process(self.0)
266    }
267}
268impl Queueable for Exit {}
269
270mod __private {
271    use crate::reactor::Registration;
272
273    #[doc(hidden)]
274    pub trait QueueableSealed {
275        /// Get a registration object for this filter.
276        fn registration(&mut self) -> Registration;
277    }
278}