async_io/os/kqueue.rs
1//! Functionality that is only available for `kqueue`-based platforms.
2
3use __private::QueueableSealed;
4
5use crate::reactor::{Reactor, Readable, Registration};
6use crate::Async;
7
8use std::future::Future;
9use std::io::{Error, Result};
10use std::num::NonZeroI32;
11use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
12use std::pin::Pin;
13use std::process::Child;
14use std::task::{Context, Poll};
15
16/// A wrapper around a queueable object that waits until it is ready.
17///
18/// The underlying `kqueue` implementation can be used to poll for events besides file descriptor
19/// read/write readiness. This API makes these faculties available to the user.
20///
21/// See the [`Queueable`] trait and its implementors for objects that currently support being registered
22/// into the reactor.
23#[derive(Debug)]
24pub struct Filter<T>(Async<T>);
25
26impl<T> AsRef<T> for Filter<T> {
27 fn as_ref(&self) -> &T {
28 self.0.as_ref()
29 }
30}
31
32impl<T> AsMut<T> for Filter<T> {
33 fn as_mut(&mut self) -> &mut T {
34 self.get_mut()
35 }
36}
37
38impl<T: Queueable> Filter<T> {
39 /// Create a new [`Filter`] around a [`Queueable`].
40 ///
41 /// # Examples
42 ///
43 /// ```no_run
44 /// use std::process::Command;
45 /// use async_io::os::kqueue::{Exit, Filter};
46 ///
47 /// // Create a new process to wait for.
48 /// let mut child = Command::new("sleep").arg("5").spawn().unwrap();
49 ///
50 /// // Wrap the process in an `Async` object that waits for it to exit.
51 /// let process = Filter::new(Exit::new(child)).unwrap();
52 ///
53 /// // Wait for the process to exit.
54 /// # async_io::block_on(async {
55 /// process.ready().await.unwrap();
56 /// # });
57 /// ```
58 pub fn new(mut filter: T) -> Result<Self> {
59 Ok(Self(Async {
60 source: Reactor::get().insert_io(filter.registration())?,
61 io: Some(filter),
62 }))
63 }
64}
65
66impl<T: AsRawFd> AsRawFd for Filter<T> {
67 fn as_raw_fd(&self) -> RawFd {
68 self.0.as_raw_fd()
69 }
70}
71
72impl<T: AsFd> AsFd for Filter<T> {
73 fn as_fd(&self) -> BorrowedFd<'_> {
74 self.0.as_fd()
75 }
76}
77
78impl<T: AsFd + From<OwnedFd>> TryFrom<OwnedFd> for Filter<T> {
79 type Error = Error;
80
81 fn try_from(fd: OwnedFd) -> Result<Self> {
82 Ok(Self(Async::try_from(fd)?))
83 }
84}
85
86impl<T: Into<OwnedFd>> TryFrom<Filter<T>> for OwnedFd {
87 type Error = Error;
88
89 fn try_from(filter: Filter<T>) -> Result<Self> {
90 filter.0.try_into()
91 }
92}
93
94impl<T> Filter<T> {
95 /// Gets a reference to the underlying [`Queueable`] object.
96 ///
97 /// # Examples
98 ///
99 /// ```
100 /// use async_io::os::kqueue::{Exit, Filter};
101 ///
102 /// # futures_lite::future::block_on(async {
103 /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
104 /// let process = Filter::new(Exit::new(child)).unwrap();
105 /// let inner = process.get_ref();
106 /// # });
107 /// ```
108 pub fn get_ref(&self) -> &T {
109 self.0.get_ref()
110 }
111
112 /// Gets a mutable reference to the underlying [`Queueable`] object.
113 ///
114 /// Unlike in [`Async`], this method is safe to call, since dropping the [`Filter`] will
115 /// not cause any undefined behavior.
116 ///
117 /// # Examples
118 ///
119 /// ```
120 /// use async_io::os::kqueue::{Exit, Filter};
121 ///
122 /// # futures_lite::future::block_on(async {
123 /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
124 /// let mut process = Filter::new(Exit::new(child)).unwrap();
125 /// let inner = process.get_mut();
126 /// # });
127 /// ```
128 pub fn get_mut(&mut self) -> &mut T {
129 unsafe { self.0.get_mut() }
130 }
131
132 /// Unwraps the inner [`Queueable`] object.
133 ///
134 /// # Examples
135 ///
136 /// ```
137 /// use async_io::os::kqueue::{Exit, Filter};
138 ///
139 /// # futures_lite::future::block_on(async {
140 /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap();
141 /// let process = Filter::new(Exit::new(child)).unwrap();
142 /// let inner = process.into_inner().unwrap();
143 /// # });
144 /// ```
145 pub fn into_inner(self) -> Result<T> {
146 self.0.into_inner()
147 }
148
149 /// Waits until the [`Queueable`] object is ready.
150 ///
151 /// This method completes when the underlying [`Queueable`] object has completed. See the documentation
152 /// for the [`Queueable`] object for more information.
153 ///
154 /// # Examples
155 ///
156 /// ```no_run
157 /// use std::process::Command;
158 /// use async_io::os::kqueue::{Exit, Filter};
159 ///
160 /// # futures_lite::future::block_on(async {
161 /// let child = Command::new("sleep").arg("5").spawn()?;
162 /// let process = Filter::new(Exit::new(child))?;
163 ///
164 /// // Wait for the process to exit.
165 /// process.ready().await?;
166 /// # std::io::Result::Ok(()) });
167 /// ```
168 pub fn ready(&self) -> Ready<'_, T> {
169 Ready(self.0.readable())
170 }
171
172 /// Polls the I/O handle for readiness.
173 ///
174 /// When this method returns [`Poll::Ready`], that means that the OS has delivered a notification
175 /// that the underlying [`Queueable`] object is ready. See the documentation for the [`Queueable`]
176 /// object for more information.
177 ///
178 /// # Caveats
179 ///
180 /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
181 /// will just keep waking each other in turn, thus wasting CPU time.
182 ///
183 /// # Examples
184 ///
185 /// ```no_run
186 /// use std::process::Command;
187 /// use async_io::os::kqueue::{Exit, Filter};
188 /// use futures_lite::future;
189 ///
190 /// # futures_lite::future::block_on(async {
191 /// let child = Command::new("sleep").arg("5").spawn()?;
192 /// let process = Filter::new(Exit::new(child))?;
193 ///
194 /// // Wait for the process to exit.
195 /// future::poll_fn(|cx| process.poll_ready(cx)).await?;
196 /// # std::io::Result::Ok(()) });
197 /// ```
198 pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
199 self.0.poll_readable(cx)
200 }
201}
202
203/// Future for [`Filter::ready`].
204#[must_use = "futures do nothing unless you `.await` or poll them"]
205#[derive(Debug)]
206pub struct Ready<'a, T>(Readable<'a, T>);
207
208impl<T> Future for Ready<'_, T> {
209 type Output = Result<()>;
210
211 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
212 Pin::new(&mut self.0).poll(cx)
213 }
214}
215
216/// Objects that can be registered into the reactor via a [`Async`](crate::Async).
217///
218/// These objects represent other filters associated with the `kqueue` runtime aside from readability
219/// and writability. Rather than waiting on readable/writable, they wait on "readiness". This is
220/// typically used for signals and child process exits.
221pub trait Queueable: QueueableSealed {}
222
223/// An object representing a signal.
224///
225/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
226/// it will return a [`readable`](crate::Async::readable) event when the signal is received.
227#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
228pub struct Signal(pub i32);
229
230impl QueueableSealed for Signal {
231 fn registration(&mut self) -> Registration {
232 Registration::Signal(*self)
233 }
234}
235impl Queueable for Signal {}
236
237/// Wait for a child process to exit.
238///
239/// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter),
240/// it will return a [`readable`](crate::Async::readable) event when the child process exits.
241#[derive(Debug)]
242pub struct Exit(NonZeroI32);
243
244impl Exit {
245 /// Create a new `Exit` object.
246 pub fn new(child: Child) -> Self {
247 Self(
248 NonZeroI32::new(child.id().try_into().expect("unable to parse pid"))
249 .expect("cannot register pid with zero value"),
250 )
251 }
252
253 /// Create a new `Exit` object from a PID.
254 ///
255 /// # Safety
256 ///
257 /// The PID must be tied to an actual child process.
258 pub unsafe fn from_pid(pid: NonZeroI32) -> Self {
259 Self(pid)
260 }
261}
262
263impl QueueableSealed for Exit {
264 fn registration(&mut self) -> Registration {
265 Registration::Process(self.0)
266 }
267}
268impl Queueable for Exit {}
269
270mod __private {
271 use crate::reactor::Registration;
272
273 #[doc(hidden)]
274 pub trait QueueableSealed {
275 /// Get a registration object for this filter.
276 fn registration(&mut self) -> Registration;
277 }
278}