madsim_real_tokio/io/async_fd.rs
1use crate::io::{Interest, Ready};
2use crate::runtime::io::{ReadyEvent, Registration};
3use crate::runtime::scheduler;
4
5use mio::unix::SourceFd;
6use std::error::Error;
7use std::fmt;
8use std::io;
9use std::os::unix::io::{AsRawFd, RawFd};
10use std::{task::Context, task::Poll};
11
12/// Associates an IO object backed by a Unix file descriptor with the tokio
13/// reactor, allowing for readiness to be polled. The file descriptor must be of
14/// a type that can be used with the OS polling facilities (ie, `poll`, `epoll`,
15/// `kqueue`, etc), such as a network socket or pipe, and the file descriptor
16/// must have the nonblocking mode set to true.
17///
18/// Creating an [`AsyncFd`] registers the file descriptor with the current tokio
19/// Reactor, allowing you to directly await the file descriptor being readable
20/// or writable. Once registered, the file descriptor remains registered until
21/// the [`AsyncFd`] is dropped.
22///
23/// The [`AsyncFd`] takes ownership of an arbitrary object to represent the IO
24/// object. It is intended that this object will handle closing the file
25/// descriptor when it is dropped, avoiding resource leaks and ensuring that the
26/// [`AsyncFd`] can clean up the registration before closing the file descriptor.
27/// The [`AsyncFd::into_inner`] function can be used to extract the inner object
28/// to retake control from the tokio IO reactor.
29///
30/// The inner object is required to implement [`AsRawFd`]. This file descriptor
31/// must not change while [`AsyncFd`] owns the inner object, i.e. the
32/// [`AsRawFd::as_raw_fd`] method on the inner type must always return the same
33/// file descriptor when called multiple times. Failure to uphold this results
34/// in unspecified behavior in the IO driver, which may include breaking
35/// notifications for other sockets/etc.
36///
37/// Polling for readiness is done by calling the async functions [`readable`]
38/// and [`writable`]. These functions complete when the associated readiness
39/// condition is observed. Any number of tasks can query the same `AsyncFd` in
40/// parallel, on the same or different conditions.
41///
42/// On some platforms, the readiness detecting mechanism relies on
43/// edge-triggered notifications. This means that the OS will only notify Tokio
44/// when the file descriptor transitions from not-ready to ready. For this to
45/// work you should first try to read or write and only poll for readiness
46/// if that fails with an error of [`std::io::ErrorKind::WouldBlock`].
47///
48/// Tokio internally tracks when it has received a ready notification, and when
49/// readiness checking functions like [`readable`] and [`writable`] are called,
50/// if the readiness flag is set, these async functions will complete
51/// immediately. This however does mean that it is critical to ensure that this
52/// ready flag is cleared when (and only when) the file descriptor ceases to be
53/// ready. The [`AsyncFdReadyGuard`] returned from readiness checking functions
54/// serves this function; after calling a readiness-checking async function,
55/// you must use this [`AsyncFdReadyGuard`] to signal to tokio whether the file
56/// descriptor is no longer in a ready state.
57///
58/// ## Use with to a poll-based API
59///
60/// In some cases it may be desirable to use `AsyncFd` from APIs similar to
61/// [`TcpStream::poll_read_ready`]. The [`AsyncFd::poll_read_ready`] and
62/// [`AsyncFd::poll_write_ready`] functions are provided for this purpose.
63/// Because these functions don't create a future to hold their state, they have
64/// the limitation that only one task can wait on each direction (read or write)
65/// at a time.
66///
67/// # Examples
68///
69/// This example shows how to turn [`std::net::TcpStream`] asynchronous using
70/// `AsyncFd`. It implements the read/write operations both as an `async fn`
71/// and using the IO traits [`AsyncRead`] and [`AsyncWrite`].
72///
73/// ```no_run
74/// use futures::ready;
75/// use std::io::{self, Read, Write};
76/// use std::net::TcpStream;
77/// use std::pin::Pin;
78/// use std::task::{Context, Poll};
79/// use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
80/// use tokio::io::unix::AsyncFd;
81///
82/// pub struct AsyncTcpStream {
83/// inner: AsyncFd<TcpStream>,
84/// }
85///
86/// impl AsyncTcpStream {
87/// pub fn new(tcp: TcpStream) -> io::Result<Self> {
88/// tcp.set_nonblocking(true)?;
89/// Ok(Self {
90/// inner: AsyncFd::new(tcp)?,
91/// })
92/// }
93///
94/// pub async fn read(&self, out: &mut [u8]) -> io::Result<usize> {
95/// loop {
96/// let mut guard = self.inner.readable().await?;
97///
98/// match guard.try_io(|inner| inner.get_ref().read(out)) {
99/// Ok(result) => return result,
100/// Err(_would_block) => continue,
101/// }
102/// }
103/// }
104///
105/// pub async fn write(&self, buf: &[u8]) -> io::Result<usize> {
106/// loop {
107/// let mut guard = self.inner.writable().await?;
108///
109/// match guard.try_io(|inner| inner.get_ref().write(buf)) {
110/// Ok(result) => return result,
111/// Err(_would_block) => continue,
112/// }
113/// }
114/// }
115/// }
116///
117/// impl AsyncRead for AsyncTcpStream {
118/// fn poll_read(
119/// self: Pin<&mut Self>,
120/// cx: &mut Context<'_>,
121/// buf: &mut ReadBuf<'_>
122/// ) -> Poll<io::Result<()>> {
123/// loop {
124/// let mut guard = ready!(self.inner.poll_read_ready(cx))?;
125///
126/// let unfilled = buf.initialize_unfilled();
127/// match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
128/// Ok(Ok(len)) => {
129/// buf.advance(len);
130/// return Poll::Ready(Ok(()));
131/// },
132/// Ok(Err(err)) => return Poll::Ready(Err(err)),
133/// Err(_would_block) => continue,
134/// }
135/// }
136/// }
137/// }
138///
139/// impl AsyncWrite for AsyncTcpStream {
140/// fn poll_write(
141/// self: Pin<&mut Self>,
142/// cx: &mut Context<'_>,
143/// buf: &[u8]
144/// ) -> Poll<io::Result<usize>> {
145/// loop {
146/// let mut guard = ready!(self.inner.poll_write_ready(cx))?;
147///
148/// match guard.try_io(|inner| inner.get_ref().write(buf)) {
149/// Ok(result) => return Poll::Ready(result),
150/// Err(_would_block) => continue,
151/// }
152/// }
153/// }
154///
155/// fn poll_flush(
156/// self: Pin<&mut Self>,
157/// cx: &mut Context<'_>,
158/// ) -> Poll<io::Result<()>> {
159/// // tcp flush is a no-op
160/// Poll::Ready(Ok(()))
161/// }
162///
163/// fn poll_shutdown(
164/// self: Pin<&mut Self>,
165/// cx: &mut Context<'_>,
166/// ) -> Poll<io::Result<()>> {
167/// self.inner.get_ref().shutdown(std::net::Shutdown::Write)?;
168/// Poll::Ready(Ok(()))
169/// }
170/// }
171/// ```
172///
173/// [`readable`]: method@Self::readable
174/// [`writable`]: method@Self::writable
175/// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
176/// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream
177/// [`AsyncRead`]: trait@crate::io::AsyncRead
178/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
179pub struct AsyncFd<T: AsRawFd> {
180 registration: Registration,
181 // The inner value is always present. the Option is required for `drop` and `into_inner`.
182 // In all other methods `unwrap` is valid, and will never panic.
183 inner: Option<T>,
184}
185
186/// Represents an IO-ready event detected on a particular file descriptor that
187/// has not yet been acknowledged. This is a `must_use` structure to help ensure
188/// that you do not forget to explicitly clear (or not clear) the event.
189///
190/// This type exposes an immutable reference to the underlying IO object.
191#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
192pub struct AsyncFdReadyGuard<'a, T: AsRawFd> {
193 async_fd: &'a AsyncFd<T>,
194 event: Option<ReadyEvent>,
195}
196
197/// Represents an IO-ready event detected on a particular file descriptor that
198/// has not yet been acknowledged. This is a `must_use` structure to help ensure
199/// that you do not forget to explicitly clear (or not clear) the event.
200///
201/// This type exposes a mutable reference to the underlying IO object.
202#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
203pub struct AsyncFdReadyMutGuard<'a, T: AsRawFd> {
204 async_fd: &'a mut AsyncFd<T>,
205 event: Option<ReadyEvent>,
206}
207
208impl<T: AsRawFd> AsyncFd<T> {
209 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
210 /// implementing [`AsRawFd`]. The backing file descriptor is cached at the
211 /// time of creation.
212 ///
213 /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
214 /// control, use [`AsyncFd::with_interest`].
215 ///
216 /// This method must be called in the context of a tokio runtime.
217 ///
218 /// # Panics
219 ///
220 /// This function panics if there is no current reactor set, or if the `rt`
221 /// feature flag is not enabled.
222 #[inline]
223 #[track_caller]
224 pub fn new(inner: T) -> io::Result<Self>
225 where
226 T: AsRawFd,
227 {
228 Self::with_interest(inner, Interest::READABLE | Interest::WRITABLE)
229 }
230
231 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
232 /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
233 /// file descriptor is cached at the time of creation.
234 ///
235 /// # Panics
236 ///
237 /// This function panics if there is no current reactor set, or if the `rt`
238 /// feature flag is not enabled.
239 #[inline]
240 #[track_caller]
241 pub fn with_interest(inner: T, interest: Interest) -> io::Result<Self>
242 where
243 T: AsRawFd,
244 {
245 Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
246 }
247
248 #[track_caller]
249 pub(crate) fn new_with_handle_and_interest(
250 inner: T,
251 handle: scheduler::Handle,
252 interest: Interest,
253 ) -> io::Result<Self> {
254 Self::try_new_with_handle_and_interest(inner, handle, interest).map_err(Into::into)
255 }
256
257 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
258 /// implementing [`AsRawFd`]. The backing file descriptor is cached at the
259 /// time of creation.
260 ///
261 /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
262 /// control, use [`AsyncFd::try_with_interest`].
263 ///
264 /// This method must be called in the context of a tokio runtime.
265 ///
266 /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
267 /// passed to this function.
268 ///
269 /// # Panics
270 ///
271 /// This function panics if there is no current reactor set, or if the `rt`
272 /// feature flag is not enabled.
273 #[inline]
274 #[track_caller]
275 pub fn try_new(inner: T) -> Result<Self, AsyncFdTryNewError<T>>
276 where
277 T: AsRawFd,
278 {
279 Self::try_with_interest(inner, Interest::READABLE | Interest::WRITABLE)
280 }
281
282 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
283 /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
284 /// file descriptor is cached at the time of creation.
285 ///
286 /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
287 /// passed to this function.
288 ///
289 /// # Panics
290 ///
291 /// This function panics if there is no current reactor set, or if the `rt`
292 /// feature flag is not enabled.
293 #[inline]
294 #[track_caller]
295 pub fn try_with_interest(inner: T, interest: Interest) -> Result<Self, AsyncFdTryNewError<T>>
296 where
297 T: AsRawFd,
298 {
299 Self::try_new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
300 }
301
302 #[track_caller]
303 pub(crate) fn try_new_with_handle_and_interest(
304 inner: T,
305 handle: scheduler::Handle,
306 interest: Interest,
307 ) -> Result<Self, AsyncFdTryNewError<T>> {
308 let fd = inner.as_raw_fd();
309
310 match Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle) {
311 Ok(registration) => Ok(AsyncFd {
312 registration,
313 inner: Some(inner),
314 }),
315 Err(cause) => Err(AsyncFdTryNewError { inner, cause }),
316 }
317 }
318
319 /// Returns a shared reference to the backing object of this [`AsyncFd`].
320 #[inline]
321 pub fn get_ref(&self) -> &T {
322 self.inner.as_ref().unwrap()
323 }
324
325 /// Returns a mutable reference to the backing object of this [`AsyncFd`].
326 #[inline]
327 pub fn get_mut(&mut self) -> &mut T {
328 self.inner.as_mut().unwrap()
329 }
330
331 fn take_inner(&mut self) -> Option<T> {
332 let inner = self.inner.take()?;
333 let fd = inner.as_raw_fd();
334
335 let _ = self.registration.deregister(&mut SourceFd(&fd));
336
337 Some(inner)
338 }
339
340 /// Deregisters this file descriptor and returns ownership of the backing
341 /// object.
342 pub fn into_inner(mut self) -> T {
343 self.take_inner().unwrap()
344 }
345
346 /// Polls for read readiness.
347 ///
348 /// If the file descriptor is not currently ready for reading, this method
349 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
350 /// file descriptor becomes ready for reading, [`Waker::wake`] will be called.
351 ///
352 /// Note that on multiple calls to [`poll_read_ready`] or
353 /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the
354 /// most recent call is scheduled to receive a wakeup. (However,
355 /// [`poll_write_ready`] retains a second, independent waker).
356 ///
357 /// This method is intended for cases where creating and pinning a future
358 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
359 /// preferred, as this supports polling from multiple tasks at once.
360 ///
361 /// This method takes `&self`, so it is possible to call this method
362 /// concurrently with other methods on this struct. This method only
363 /// provides shared access to the inner IO resource when handling the
364 /// [`AsyncFdReadyGuard`].
365 ///
366 /// [`poll_read_ready`]: method@Self::poll_read_ready
367 /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut
368 /// [`poll_write_ready`]: method@Self::poll_write_ready
369 /// [`readable`]: method@Self::readable
370 /// [`Context`]: struct@std::task::Context
371 /// [`Waker`]: struct@std::task::Waker
372 /// [`Waker::wake`]: method@std::task::Waker::wake
373 pub fn poll_read_ready<'a>(
374 &'a self,
375 cx: &mut Context<'_>,
376 ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
377 let event = ready!(self.registration.poll_read_ready(cx))?;
378
379 Poll::Ready(Ok(AsyncFdReadyGuard {
380 async_fd: self,
381 event: Some(event),
382 }))
383 }
384
385 /// Polls for read readiness.
386 ///
387 /// If the file descriptor is not currently ready for reading, this method
388 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
389 /// file descriptor becomes ready for reading, [`Waker::wake`] will be called.
390 ///
391 /// Note that on multiple calls to [`poll_read_ready`] or
392 /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the
393 /// most recent call is scheduled to receive a wakeup. (However,
394 /// [`poll_write_ready`] retains a second, independent waker).
395 ///
396 /// This method is intended for cases where creating and pinning a future
397 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
398 /// preferred, as this supports polling from multiple tasks at once.
399 ///
400 /// This method takes `&mut self`, so it is possible to access the inner IO
401 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
402 ///
403 /// [`poll_read_ready`]: method@Self::poll_read_ready
404 /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut
405 /// [`poll_write_ready`]: method@Self::poll_write_ready
406 /// [`readable`]: method@Self::readable
407 /// [`Context`]: struct@std::task::Context
408 /// [`Waker`]: struct@std::task::Waker
409 /// [`Waker::wake`]: method@std::task::Waker::wake
410 pub fn poll_read_ready_mut<'a>(
411 &'a mut self,
412 cx: &mut Context<'_>,
413 ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> {
414 let event = ready!(self.registration.poll_read_ready(cx))?;
415
416 Poll::Ready(Ok(AsyncFdReadyMutGuard {
417 async_fd: self,
418 event: Some(event),
419 }))
420 }
421
422 /// Polls for write readiness.
423 ///
424 /// If the file descriptor is not currently ready for writing, this method
425 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
426 /// file descriptor becomes ready for writing, [`Waker::wake`] will be called.
427 ///
428 /// Note that on multiple calls to [`poll_write_ready`] or
429 /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the
430 /// most recent call is scheduled to receive a wakeup. (However,
431 /// [`poll_read_ready`] retains a second, independent waker).
432 ///
433 /// This method is intended for cases where creating and pinning a future
434 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
435 /// preferred, as this supports polling from multiple tasks at once.
436 ///
437 /// This method takes `&self`, so it is possible to call this method
438 /// concurrently with other methods on this struct. This method only
439 /// provides shared access to the inner IO resource when handling the
440 /// [`AsyncFdReadyGuard`].
441 ///
442 /// [`poll_read_ready`]: method@Self::poll_read_ready
443 /// [`poll_write_ready`]: method@Self::poll_write_ready
444 /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut
445 /// [`writable`]: method@Self::readable
446 /// [`Context`]: struct@std::task::Context
447 /// [`Waker`]: struct@std::task::Waker
448 /// [`Waker::wake`]: method@std::task::Waker::wake
449 pub fn poll_write_ready<'a>(
450 &'a self,
451 cx: &mut Context<'_>,
452 ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
453 let event = ready!(self.registration.poll_write_ready(cx))?;
454
455 Poll::Ready(Ok(AsyncFdReadyGuard {
456 async_fd: self,
457 event: Some(event),
458 }))
459 }
460
461 /// Polls for write readiness.
462 ///
463 /// If the file descriptor is not currently ready for writing, this method
464 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
465 /// file descriptor becomes ready for writing, [`Waker::wake`] will be called.
466 ///
467 /// Note that on multiple calls to [`poll_write_ready`] or
468 /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the
469 /// most recent call is scheduled to receive a wakeup. (However,
470 /// [`poll_read_ready`] retains a second, independent waker).
471 ///
472 /// This method is intended for cases where creating and pinning a future
473 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
474 /// preferred, as this supports polling from multiple tasks at once.
475 ///
476 /// This method takes `&mut self`, so it is possible to access the inner IO
477 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
478 ///
479 /// [`poll_read_ready`]: method@Self::poll_read_ready
480 /// [`poll_write_ready`]: method@Self::poll_write_ready
481 /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut
482 /// [`writable`]: method@Self::readable
483 /// [`Context`]: struct@std::task::Context
484 /// [`Waker`]: struct@std::task::Waker
485 /// [`Waker::wake`]: method@std::task::Waker::wake
486 pub fn poll_write_ready_mut<'a>(
487 &'a mut self,
488 cx: &mut Context<'_>,
489 ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> {
490 let event = ready!(self.registration.poll_write_ready(cx))?;
491
492 Poll::Ready(Ok(AsyncFdReadyMutGuard {
493 async_fd: self,
494 event: Some(event),
495 }))
496 }
497
498 /// Waits for any of the requested ready states, returning a
499 /// [`AsyncFdReadyGuard`] that must be dropped to resume
500 /// polling for the requested ready states.
501 ///
502 /// The function may complete without the file descriptor being ready. This is a
503 /// false-positive and attempting an operation will return with
504 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
505 /// [`Ready`] set, so you should always check the returned value and possibly
506 /// wait again if the requested states are not set.
507 ///
508 /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared.
509 /// When a combined interest is used, it is important to clear only the readiness
510 /// that is actually observed to block. For instance when the combined
511 /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only
512 /// read readiness should be cleared using the [`AsyncFdReadyGuard::clear_ready_matching`] method:
513 /// `guard.clear_ready_matching(Ready::READABLE)`.
514 /// Also clearing the write readiness in this case would be incorrect. The [`AsyncFdReadyGuard::clear_ready`]
515 /// method clears all readiness flags.
516 ///
517 /// This method takes `&self`, so it is possible to call this method
518 /// concurrently with other methods on this struct. This method only
519 /// provides shared access to the inner IO resource when handling the
520 /// [`AsyncFdReadyGuard`].
521 ///
522 /// # Examples
523 ///
524 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
525 /// splitting.
526 ///
527 /// ```no_run
528 /// use std::error::Error;
529 /// use std::io;
530 /// use std::io::{Read, Write};
531 /// use std::net::TcpStream;
532 /// use tokio::io::unix::AsyncFd;
533 /// use tokio::io::{Interest, Ready};
534 ///
535 /// #[tokio::main]
536 /// async fn main() -> Result<(), Box<dyn Error>> {
537 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
538 /// stream.set_nonblocking(true)?;
539 /// let stream = AsyncFd::new(stream)?;
540 ///
541 /// loop {
542 /// let mut guard = stream
543 /// .ready(Interest::READABLE | Interest::WRITABLE)
544 /// .await?;
545 ///
546 /// if guard.ready().is_readable() {
547 /// let mut data = vec![0; 1024];
548 /// // Try to read data, this may still fail with `WouldBlock`
549 /// // if the readiness event is a false positive.
550 /// match stream.get_ref().read(&mut data) {
551 /// Ok(n) => {
552 /// println!("read {} bytes", n);
553 /// }
554 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
555 /// // a read has blocked, but a write might still succeed.
556 /// // clear only the read readiness.
557 /// guard.clear_ready_matching(Ready::READABLE);
558 /// continue;
559 /// }
560 /// Err(e) => {
561 /// return Err(e.into());
562 /// }
563 /// }
564 /// }
565 ///
566 /// if guard.ready().is_writable() {
567 /// // Try to write data, this may still fail with `WouldBlock`
568 /// // if the readiness event is a false positive.
569 /// match stream.get_ref().write(b"hello world") {
570 /// Ok(n) => {
571 /// println!("write {} bytes", n);
572 /// }
573 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
574 /// // a write has blocked, but a read might still succeed.
575 /// // clear only the write readiness.
576 /// guard.clear_ready_matching(Ready::WRITABLE);
577 /// continue;
578 /// }
579 /// Err(e) => {
580 /// return Err(e.into());
581 /// }
582 /// }
583 /// }
584 /// }
585 /// }
586 /// ```
587 pub async fn ready(&self, interest: Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
588 let event = self.registration.readiness(interest).await?;
589
590 Ok(AsyncFdReadyGuard {
591 async_fd: self,
592 event: Some(event),
593 })
594 }
595
596 /// Waits for any of the requested ready states, returning a
597 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume
598 /// polling for the requested ready states.
599 ///
600 /// The function may complete without the file descriptor being ready. This is a
601 /// false-positive and attempting an operation will return with
602 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
603 /// [`Ready`] set, so you should always check the returned value and possibly
604 /// wait again if the requested states are not set.
605 ///
606 /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared.
607 /// When a combined interest is used, it is important to clear only the readiness
608 /// that is actually observed to block. For instance when the combined
609 /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only
610 /// read readiness should be cleared using the [`AsyncFdReadyMutGuard::clear_ready_matching`] method:
611 /// `guard.clear_ready_matching(Ready::READABLE)`.
612 /// Also clearing the write readiness in this case would be incorrect.
613 /// The [`AsyncFdReadyMutGuard::clear_ready`] method clears all readiness flags.
614 ///
615 /// This method takes `&mut self`, so it is possible to access the inner IO
616 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
617 ///
618 /// # Examples
619 ///
620 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
621 /// splitting.
622 ///
623 /// ```no_run
624 /// use std::error::Error;
625 /// use std::io;
626 /// use std::io::{Read, Write};
627 /// use std::net::TcpStream;
628 /// use tokio::io::unix::AsyncFd;
629 /// use tokio::io::{Interest, Ready};
630 ///
631 /// #[tokio::main]
632 /// async fn main() -> Result<(), Box<dyn Error>> {
633 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
634 /// stream.set_nonblocking(true)?;
635 /// let mut stream = AsyncFd::new(stream)?;
636 ///
637 /// loop {
638 /// let mut guard = stream
639 /// .ready_mut(Interest::READABLE | Interest::WRITABLE)
640 /// .await?;
641 ///
642 /// if guard.ready().is_readable() {
643 /// let mut data = vec![0; 1024];
644 /// // Try to read data, this may still fail with `WouldBlock`
645 /// // if the readiness event is a false positive.
646 /// match guard.get_inner_mut().read(&mut data) {
647 /// Ok(n) => {
648 /// println!("read {} bytes", n);
649 /// }
650 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
651 /// // a read has blocked, but a write might still succeed.
652 /// // clear only the read readiness.
653 /// guard.clear_ready_matching(Ready::READABLE);
654 /// continue;
655 /// }
656 /// Err(e) => {
657 /// return Err(e.into());
658 /// }
659 /// }
660 /// }
661 ///
662 /// if guard.ready().is_writable() {
663 /// // Try to write data, this may still fail with `WouldBlock`
664 /// // if the readiness event is a false positive.
665 /// match guard.get_inner_mut().write(b"hello world") {
666 /// Ok(n) => {
667 /// println!("write {} bytes", n);
668 /// }
669 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
670 /// // a write has blocked, but a read might still succeed.
671 /// // clear only the write readiness.
672 /// guard.clear_ready_matching(Ready::WRITABLE);
673 /// continue;
674 /// }
675 /// Err(e) => {
676 /// return Err(e.into());
677 /// }
678 /// }
679 /// }
680 /// }
681 /// }
682 /// ```
683 pub async fn ready_mut(
684 &mut self,
685 interest: Interest,
686 ) -> io::Result<AsyncFdReadyMutGuard<'_, T>> {
687 let event = self.registration.readiness(interest).await?;
688
689 Ok(AsyncFdReadyMutGuard {
690 async_fd: self,
691 event: Some(event),
692 })
693 }
694
695 /// Waits for the file descriptor to become readable, returning a
696 /// [`AsyncFdReadyGuard`] that must be dropped to resume read-readiness
697 /// polling.
698 ///
699 /// This method takes `&self`, so it is possible to call this method
700 /// concurrently with other methods on this struct. This method only
701 /// provides shared access to the inner IO resource when handling the
702 /// [`AsyncFdReadyGuard`].
703 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
704 pub async fn readable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
705 self.ready(Interest::READABLE).await
706 }
707
708 /// Waits for the file descriptor to become readable, returning a
709 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume read-readiness
710 /// polling.
711 ///
712 /// This method takes `&mut self`, so it is possible to access the inner IO
713 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
714 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
715 pub async fn readable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
716 self.ready_mut(Interest::READABLE).await
717 }
718
719 /// Waits for the file descriptor to become writable, returning a
720 /// [`AsyncFdReadyGuard`] that must be dropped to resume write-readiness
721 /// polling.
722 ///
723 /// This method takes `&self`, so it is possible to call this method
724 /// concurrently with other methods on this struct. This method only
725 /// provides shared access to the inner IO resource when handling the
726 /// [`AsyncFdReadyGuard`].
727 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
728 pub async fn writable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
729 self.ready(Interest::WRITABLE).await
730 }
731
732 /// Waits for the file descriptor to become writable, returning a
733 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume write-readiness
734 /// polling.
735 ///
736 /// This method takes `&mut self`, so it is possible to access the inner IO
737 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
738 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
739 pub async fn writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
740 self.ready_mut(Interest::WRITABLE).await
741 }
742
743 /// Reads or writes from the file descriptor using a user-provided IO operation.
744 ///
745 /// The `async_io` method is a convenience utility that waits for the file
746 /// descriptor to become ready, and then executes the provided IO operation.
747 /// Since file descriptors may be marked ready spuriously, the closure will
748 /// be called repeatedly until it returns something other than a
749 /// [`WouldBlock`] error. This is done using the following loop:
750 ///
751 /// ```no_run
752 /// # use std::io::{self, Result};
753 /// # struct Dox<T> { inner: T }
754 /// # impl<T> Dox<T> {
755 /// # async fn writable(&self) -> Result<&Self> {
756 /// # Ok(self)
757 /// # }
758 /// # fn try_io<R>(&self, _: impl FnMut(&T) -> Result<R>) -> Result<Result<R>> {
759 /// # panic!()
760 /// # }
761 /// async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
762 /// loop {
763 /// // or `readable` if called with the read interest.
764 /// let guard = self.writable().await?;
765 ///
766 /// match guard.try_io(&mut f) {
767 /// Ok(result) => return result,
768 /// Err(_would_block) => continue,
769 /// }
770 /// }
771 /// }
772 /// # }
773 /// ```
774 ///
775 /// The closure should only return a [`WouldBlock`] error if it has performed
776 /// an IO operation on the file descriptor that failed due to the file descriptor not being
777 /// ready. Returning a [`WouldBlock`] error in any other situation will
778 /// incorrectly clear the readiness flag, which can cause the file descriptor to
779 /// behave incorrectly.
780 ///
781 /// The closure should not perform the IO operation using any of the methods
782 /// defined on the Tokio [`AsyncFd`] type, as this will mess with the
783 /// readiness flag and can cause the file descriptor to behave incorrectly.
784 ///
785 /// This method is not intended to be used with combined interests.
786 /// The closure should perform only one type of IO operation, so it should not
787 /// require more than one ready state. This method may panic or sleep forever
788 /// if it is called with a combined interest.
789 ///
790 /// # Examples
791 ///
792 /// This example sends some bytes on the inner [`std::net::UdpSocket`]. The `async_io`
793 /// method waits for readiness, and retries if the send operation does block. This example
794 /// is equivalent to the one given for [`try_io`].
795 ///
796 /// ```no_run
797 /// use tokio::io::{Interest, unix::AsyncFd};
798 ///
799 /// use std::io;
800 /// use std::net::UdpSocket;
801 ///
802 /// #[tokio::main]
803 /// async fn main() -> io::Result<()> {
804 /// let socket = UdpSocket::bind("0.0.0.0:8080")?;
805 /// socket.set_nonblocking(true)?;
806 /// let async_fd = AsyncFd::new(socket)?;
807 ///
808 /// let written = async_fd
809 /// .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2]))
810 /// .await?;
811 ///
812 /// println!("wrote {written} bytes");
813 ///
814 /// Ok(())
815 /// }
816 /// ```
817 ///
818 /// [`try_io`]: AsyncFdReadyGuard::try_io
819 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
820 pub async fn async_io<R>(
821 &self,
822 interest: Interest,
823 mut f: impl FnMut(&T) -> io::Result<R>,
824 ) -> io::Result<R> {
825 self.registration
826 .async_io(interest, || f(self.get_ref()))
827 .await
828 }
829
830 /// Reads or writes from the file descriptor using a user-provided IO operation.
831 ///
832 /// The behavior is the same as [`async_io`], except that the closure can mutate the inner
833 /// value of the [`AsyncFd`].
834 ///
835 /// [`async_io`]: AsyncFd::async_io
836 pub async fn async_io_mut<R>(
837 &mut self,
838 interest: Interest,
839 mut f: impl FnMut(&mut T) -> io::Result<R>,
840 ) -> io::Result<R> {
841 self.registration
842 .async_io(interest, || f(self.inner.as_mut().unwrap()))
843 .await
844 }
845}
846
847impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
848 fn as_raw_fd(&self) -> RawFd {
849 self.inner.as_ref().unwrap().as_raw_fd()
850 }
851}
852
853impl<T: AsRawFd> std::os::unix::io::AsFd for AsyncFd<T> {
854 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
855 unsafe { std::os::unix::io::BorrowedFd::borrow_raw(self.as_raw_fd()) }
856 }
857}
858
859impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
860 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
861 f.debug_struct("AsyncFd")
862 .field("inner", &self.inner)
863 .finish()
864 }
865}
866
867impl<T: AsRawFd> Drop for AsyncFd<T> {
868 fn drop(&mut self) {
869 let _ = self.take_inner();
870 }
871}
872
873impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
874 /// Indicates to tokio that the file descriptor is no longer ready. All
875 /// internal readiness flags will be cleared, and tokio will wait for the
876 /// next edge-triggered readiness notification from the OS.
877 ///
878 /// This function is commonly used with guards returned by [`AsyncFd::readable`] and
879 /// [`AsyncFd::writable`].
880 ///
881 /// It is critical that this function not be called unless your code
882 /// _actually observes_ that the file descriptor is _not_ ready. Do not call
883 /// it simply because, for example, a read succeeded; it should be called
884 /// when a read is observed to block.
885 ///
886 /// This method only clears readiness events that happened before the creation of this guard.
887 /// In other words, if the IO resource becomes ready between the creation of the guard and
888 /// this call to `clear_ready`, then the readiness is not actually cleared.
889 pub fn clear_ready(&mut self) {
890 if let Some(event) = self.event.take() {
891 self.async_fd.registration.clear_readiness(event);
892 }
893 }
894
895 /// Indicates to tokio that the file descriptor no longer has a specific readiness.
896 /// The internal readiness flag will be cleared, and tokio will wait for the
897 /// next edge-triggered readiness notification from the OS.
898 ///
899 /// This function is useful in combination with the [`AsyncFd::ready`] method when a
900 /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used.
901 ///
902 /// It is critical that this function not be called unless your code
903 /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`.
904 /// Do not call it simply because, for example, a read succeeded; it should be called
905 /// when a read is observed to block. Only clear the specific readiness that is observed to
906 /// block. For example when a read blocks when using a combined interest,
907 /// only clear `Ready::READABLE`.
908 ///
909 /// This method only clears readiness events that happened before the creation of this guard.
910 /// In other words, if the IO resource becomes ready between the creation of the guard and
911 /// this call to `clear_ready`, then the readiness is not actually cleared.
912 ///
913 /// # Examples
914 ///
915 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
916 /// splitting.
917 ///
918 /// ```no_run
919 /// use std::error::Error;
920 /// use std::io;
921 /// use std::io::{Read, Write};
922 /// use std::net::TcpStream;
923 /// use tokio::io::unix::AsyncFd;
924 /// use tokio::io::{Interest, Ready};
925 ///
926 /// #[tokio::main]
927 /// async fn main() -> Result<(), Box<dyn Error>> {
928 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
929 /// stream.set_nonblocking(true)?;
930 /// let stream = AsyncFd::new(stream)?;
931 ///
932 /// loop {
933 /// let mut guard = stream
934 /// .ready(Interest::READABLE | Interest::WRITABLE)
935 /// .await?;
936 ///
937 /// if guard.ready().is_readable() {
938 /// let mut data = vec![0; 1024];
939 /// // Try to read data, this may still fail with `WouldBlock`
940 /// // if the readiness event is a false positive.
941 /// match stream.get_ref().read(&mut data) {
942 /// Ok(n) => {
943 /// println!("read {} bytes", n);
944 /// }
945 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
946 /// // a read has blocked, but a write might still succeed.
947 /// // clear only the read readiness.
948 /// guard.clear_ready_matching(Ready::READABLE);
949 /// continue;
950 /// }
951 /// Err(e) => {
952 /// return Err(e.into());
953 /// }
954 /// }
955 /// }
956 ///
957 /// if guard.ready().is_writable() {
958 /// // Try to write data, this may still fail with `WouldBlock`
959 /// // if the readiness event is a false positive.
960 /// match stream.get_ref().write(b"hello world") {
961 /// Ok(n) => {
962 /// println!("write {} bytes", n);
963 /// }
964 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
965 /// // a write has blocked, but a read might still succeed.
966 /// // clear only the write readiness.
967 /// guard.clear_ready_matching(Ready::WRITABLE);
968 /// continue;
969 /// }
970 /// Err(e) => {
971 /// return Err(e.into());
972 /// }
973 /// }
974 /// }
975 /// }
976 /// }
977 /// ```
978 pub fn clear_ready_matching(&mut self, ready: Ready) {
979 if let Some(mut event) = self.event.take() {
980 self.async_fd
981 .registration
982 .clear_readiness(event.with_ready(ready));
983
984 // the event is no longer ready for the readiness that was just cleared
985 event.ready = event.ready - ready;
986
987 if !event.ready.is_empty() {
988 self.event = Some(event);
989 }
990 }
991 }
992
993 /// This method should be invoked when you intentionally want to keep the
994 /// ready flag asserted.
995 ///
996 /// While this function is itself a no-op, it satisfies the `#[must_use]`
997 /// constraint on the [`AsyncFdReadyGuard`] type.
998 pub fn retain_ready(&mut self) {
999 // no-op
1000 }
1001
1002 /// Get the [`Ready`] value associated with this guard.
1003 ///
1004 /// This method will return the empty readiness state if
1005 /// [`AsyncFdReadyGuard::clear_ready`] has been called on
1006 /// the guard.
1007 ///
1008 /// [`Ready`]: crate::io::Ready
1009 pub fn ready(&self) -> Ready {
1010 match &self.event {
1011 Some(event) => event.ready,
1012 None => Ready::EMPTY,
1013 }
1014 }
1015
1016 /// Performs the provided IO operation.
1017 ///
1018 /// If `f` returns a [`WouldBlock`] error, the readiness state associated
1019 /// with this file descriptor is cleared, and the method returns
1020 /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the
1021 /// `AsyncFd` again when this happens.
1022 ///
1023 /// This method helps ensure that the readiness state of the underlying file
1024 /// descriptor remains in sync with the tokio-side readiness state, by
1025 /// clearing the tokio-side state only when a [`WouldBlock`] condition
1026 /// occurs. It is the responsibility of the caller to ensure that `f`
1027 /// returns [`WouldBlock`] only if the file descriptor that originated this
1028 /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
1029 /// create this `AsyncFdReadyGuard`.
1030 ///
1031 /// # Examples
1032 ///
1033 /// This example sends some bytes to the inner [`std::net::UdpSocket`]. Waiting
1034 /// for write-readiness and retrying when the send operation does block are explicit.
1035 /// This example can be written more succinctly using [`AsyncFd::async_io`].
1036 ///
1037 /// ```no_run
1038 /// use tokio::io::unix::AsyncFd;
1039 ///
1040 /// use std::io;
1041 /// use std::net::UdpSocket;
1042 ///
1043 /// #[tokio::main]
1044 /// async fn main() -> io::Result<()> {
1045 /// let socket = UdpSocket::bind("0.0.0.0:8080")?;
1046 /// socket.set_nonblocking(true)?;
1047 /// let async_fd = AsyncFd::new(socket)?;
1048 ///
1049 /// let written = loop {
1050 /// let mut guard = async_fd.writable().await?;
1051 /// match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) {
1052 /// Ok(result) => {
1053 /// break result?;
1054 /// }
1055 /// Err(_would_block) => {
1056 /// // try_io already cleared the file descriptor's readiness state
1057 /// continue;
1058 /// }
1059 /// }
1060 /// };
1061 ///
1062 /// println!("wrote {written} bytes");
1063 ///
1064 /// Ok(())
1065 /// }
1066 /// ```
1067 ///
1068 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1069 // Alias for old name in 0.x
1070 #[cfg_attr(docsrs, doc(alias = "with_io"))]
1071 pub fn try_io<R>(
1072 &mut self,
1073 f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>,
1074 ) -> Result<io::Result<R>, TryIoError> {
1075 let result = f(self.async_fd);
1076
1077 match result {
1078 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1079 self.clear_ready();
1080 Err(TryIoError(()))
1081 }
1082 result => Ok(result),
1083 }
1084 }
1085
1086 /// Returns a shared reference to the inner [`AsyncFd`].
1087 pub fn get_ref(&self) -> &'a AsyncFd<Inner> {
1088 self.async_fd
1089 }
1090
1091 /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
1092 pub fn get_inner(&self) -> &'a Inner {
1093 self.get_ref().get_ref()
1094 }
1095}
1096
1097impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> {
1098 /// Indicates to tokio that the file descriptor is no longer ready. All
1099 /// internal readiness flags will be cleared, and tokio will wait for the
1100 /// next edge-triggered readiness notification from the OS.
1101 ///
1102 /// This function is commonly used with guards returned by [`AsyncFd::readable_mut`] and
1103 /// [`AsyncFd::writable_mut`].
1104 ///
1105 /// It is critical that this function not be called unless your code
1106 /// _actually observes_ that the file descriptor is _not_ ready. Do not call
1107 /// it simply because, for example, a read succeeded; it should be called
1108 /// when a read is observed to block.
1109 ///
1110 /// This method only clears readiness events that happened before the creation of this guard.
1111 /// In other words, if the IO resource becomes ready between the creation of the guard and
1112 /// this call to `clear_ready`, then the readiness is not actually cleared.
1113 pub fn clear_ready(&mut self) {
1114 if let Some(event) = self.event.take() {
1115 self.async_fd.registration.clear_readiness(event);
1116 }
1117 }
1118
1119 /// Indicates to tokio that the file descriptor no longer has a specific readiness.
1120 /// The internal readiness flag will be cleared, and tokio will wait for the
1121 /// next edge-triggered readiness notification from the OS.
1122 ///
1123 /// This function is useful in combination with the [`AsyncFd::ready_mut`] method when a
1124 /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used.
1125 ///
1126 /// It is critical that this function not be called unless your code
1127 /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`.
1128 /// Do not call it simply because, for example, a read succeeded; it should be called
1129 /// when a read is observed to block. Only clear the specific readiness that is observed to
1130 /// block. For example when a read blocks when using a combined interest,
1131 /// only clear `Ready::READABLE`.
1132 ///
1133 /// This method only clears readiness events that happened before the creation of this guard.
1134 /// In other words, if the IO resource becomes ready between the creation of the guard and
1135 /// this call to `clear_ready`, then the readiness is not actually cleared.
1136 ///
1137 /// # Examples
1138 ///
1139 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
1140 /// splitting.
1141 ///
1142 /// ```no_run
1143 /// use std::error::Error;
1144 /// use std::io;
1145 /// use std::io::{Read, Write};
1146 /// use std::net::TcpStream;
1147 /// use tokio::io::unix::AsyncFd;
1148 /// use tokio::io::{Interest, Ready};
1149 ///
1150 /// #[tokio::main]
1151 /// async fn main() -> Result<(), Box<dyn Error>> {
1152 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
1153 /// stream.set_nonblocking(true)?;
1154 /// let mut stream = AsyncFd::new(stream)?;
1155 ///
1156 /// loop {
1157 /// let mut guard = stream
1158 /// .ready_mut(Interest::READABLE | Interest::WRITABLE)
1159 /// .await?;
1160 ///
1161 /// if guard.ready().is_readable() {
1162 /// let mut data = vec![0; 1024];
1163 /// // Try to read data, this may still fail with `WouldBlock`
1164 /// // if the readiness event is a false positive.
1165 /// match guard.get_inner_mut().read(&mut data) {
1166 /// Ok(n) => {
1167 /// println!("read {} bytes", n);
1168 /// }
1169 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1170 /// // a read has blocked, but a write might still succeed.
1171 /// // clear only the read readiness.
1172 /// guard.clear_ready_matching(Ready::READABLE);
1173 /// continue;
1174 /// }
1175 /// Err(e) => {
1176 /// return Err(e.into());
1177 /// }
1178 /// }
1179 /// }
1180 ///
1181 /// if guard.ready().is_writable() {
1182 /// // Try to write data, this may still fail with `WouldBlock`
1183 /// // if the readiness event is a false positive.
1184 /// match guard.get_inner_mut().write(b"hello world") {
1185 /// Ok(n) => {
1186 /// println!("write {} bytes", n);
1187 /// }
1188 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1189 /// // a write has blocked, but a read might still succeed.
1190 /// // clear only the write readiness.
1191 /// guard.clear_ready_matching(Ready::WRITABLE);
1192 /// continue;
1193 /// }
1194 /// Err(e) => {
1195 /// return Err(e.into());
1196 /// }
1197 /// }
1198 /// }
1199 /// }
1200 /// }
1201 /// ```
1202 pub fn clear_ready_matching(&mut self, ready: Ready) {
1203 if let Some(mut event) = self.event.take() {
1204 self.async_fd
1205 .registration
1206 .clear_readiness(event.with_ready(ready));
1207
1208 // the event is no longer ready for the readiness that was just cleared
1209 event.ready = event.ready - ready;
1210
1211 if !event.ready.is_empty() {
1212 self.event = Some(event);
1213 }
1214 }
1215 }
1216
1217 /// This method should be invoked when you intentionally want to keep the
1218 /// ready flag asserted.
1219 ///
1220 /// While this function is itself a no-op, it satisfies the `#[must_use]`
1221 /// constraint on the [`AsyncFdReadyGuard`] type.
1222 pub fn retain_ready(&mut self) {
1223 // no-op
1224 }
1225
1226 /// Get the [`Ready`] value associated with this guard.
1227 ///
1228 /// This method will return the empty readiness state if
1229 /// [`AsyncFdReadyGuard::clear_ready`] has been called on
1230 /// the guard.
1231 ///
1232 /// [`Ready`]: super::Ready
1233 pub fn ready(&self) -> Ready {
1234 match &self.event {
1235 Some(event) => event.ready,
1236 None => Ready::EMPTY,
1237 }
1238 }
1239
1240 /// Performs the provided IO operation.
1241 ///
1242 /// If `f` returns a [`WouldBlock`] error, the readiness state associated
1243 /// with this file descriptor is cleared, and the method returns
1244 /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the
1245 /// `AsyncFd` again when this happens.
1246 ///
1247 /// This method helps ensure that the readiness state of the underlying file
1248 /// descriptor remains in sync with the tokio-side readiness state, by
1249 /// clearing the tokio-side state only when a [`WouldBlock`] condition
1250 /// occurs. It is the responsibility of the caller to ensure that `f`
1251 /// returns [`WouldBlock`] only if the file descriptor that originated this
1252 /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
1253 /// create this `AsyncFdReadyGuard`.
1254 ///
1255 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1256 pub fn try_io<R>(
1257 &mut self,
1258 f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>,
1259 ) -> Result<io::Result<R>, TryIoError> {
1260 let result = f(self.async_fd);
1261
1262 match result {
1263 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1264 self.clear_ready();
1265 Err(TryIoError(()))
1266 }
1267 result => Ok(result),
1268 }
1269 }
1270
1271 /// Returns a shared reference to the inner [`AsyncFd`].
1272 pub fn get_ref(&self) -> &AsyncFd<Inner> {
1273 self.async_fd
1274 }
1275
1276 /// Returns a mutable reference to the inner [`AsyncFd`].
1277 pub fn get_mut(&mut self) -> &mut AsyncFd<Inner> {
1278 self.async_fd
1279 }
1280
1281 /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
1282 pub fn get_inner(&self) -> &Inner {
1283 self.get_ref().get_ref()
1284 }
1285
1286 /// Returns a mutable reference to the backing object of the inner [`AsyncFd`].
1287 pub fn get_inner_mut(&mut self) -> &mut Inner {
1288 self.get_mut().get_mut()
1289 }
1290}
1291
1292impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
1293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1294 f.debug_struct("ReadyGuard")
1295 .field("async_fd", &self.async_fd)
1296 .finish()
1297 }
1298}
1299
1300impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyMutGuard<'a, T> {
1301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1302 f.debug_struct("MutReadyGuard")
1303 .field("async_fd", &self.async_fd)
1304 .finish()
1305 }
1306}
1307
1308/// The error type returned by [`try_io`].
1309///
1310/// This error indicates that the IO resource returned a [`WouldBlock`] error.
1311///
1312/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1313/// [`try_io`]: method@AsyncFdReadyGuard::try_io
1314#[derive(Debug)]
1315pub struct TryIoError(());
1316
1317/// Error returned by [`try_new`] or [`try_with_interest`].
1318///
1319/// [`try_new`]: AsyncFd::try_new
1320/// [`try_with_interest`]: AsyncFd::try_with_interest
1321pub struct AsyncFdTryNewError<T> {
1322 inner: T,
1323 cause: io::Error,
1324}
1325
1326impl<T> AsyncFdTryNewError<T> {
1327 /// Returns the original object passed to [`try_new`] or [`try_with_interest`]
1328 /// alongside the error that caused these functions to fail.
1329 ///
1330 /// [`try_new`]: AsyncFd::try_new
1331 /// [`try_with_interest`]: AsyncFd::try_with_interest
1332 pub fn into_parts(self) -> (T, io::Error) {
1333 (self.inner, self.cause)
1334 }
1335}
1336
1337impl<T> fmt::Display for AsyncFdTryNewError<T> {
1338 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1339 fmt::Display::fmt(&self.cause, f)
1340 }
1341}
1342
1343impl<T> fmt::Debug for AsyncFdTryNewError<T> {
1344 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1345 fmt::Debug::fmt(&self.cause, f)
1346 }
1347}
1348
1349impl<T> Error for AsyncFdTryNewError<T> {
1350 fn source(&self) -> Option<&(dyn Error + 'static)> {
1351 Some(&self.cause)
1352 }
1353}
1354
1355impl<T> From<AsyncFdTryNewError<T>> for io::Error {
1356 fn from(value: AsyncFdTryNewError<T>) -> Self {
1357 value.cause
1358 }
1359}