compio_fs/
async_fd.rs

1use std::io;
2#[cfg(unix)]
3use std::os::fd::{FromRawFd, RawFd};
4#[cfg(windows)]
5use std::os::windows::io::{
6    AsRawHandle, AsRawSocket, FromRawHandle, FromRawSocket, RawHandle, RawSocket,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
10use compio_driver::{
11    AsRawFd, SharedFd, ToSharedFd,
12    op::{BufResultExt, Recv, Send},
13};
14use compio_io::{AsyncRead, AsyncWrite};
15use compio_runtime::Attacher;
16#[cfg(unix)]
17use {
18    compio_buf::{IoVectoredBuf, IoVectoredBufMut},
19    compio_driver::op::{RecvVectored, SendVectored},
20};
21
22/// A wrapper for IO source, providing implementations for [`AsyncRead`] and
23/// [`AsyncWrite`].
24#[derive(Debug)]
25pub struct AsyncFd<T: AsRawFd> {
26    inner: Attacher<T>,
27}
28
29impl<T: AsRawFd> AsyncFd<T> {
30    /// Create [`AsyncFd`] and attach the source to the current runtime.
31    pub fn new(source: T) -> io::Result<Self> {
32        Ok(Self {
33            inner: Attacher::new(source)?,
34        })
35    }
36
37    /// Create [`AsyncFd`] without attaching the source.
38    ///
39    /// # Safety
40    ///
41    /// The user should handle the attachment correctly.
42    pub unsafe fn new_unchecked(source: T) -> Self {
43        Self {
44            inner: Attacher::new_unchecked(source),
45        }
46    }
47}
48
49impl<T: AsRawFd + 'static> AsyncRead for AsyncFd<T> {
50    #[inline]
51    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
52        (&*self).read(buf).await
53    }
54
55    #[cfg(unix)]
56    #[inline]
57    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
58        (&*self).read_vectored(buf).await
59    }
60}
61
62impl<T: AsRawFd + 'static> AsyncRead for &AsyncFd<T> {
63    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
64        let fd = self.inner.to_shared_fd();
65        let op = Recv::new(fd, buf);
66        compio_runtime::submit(op).await.into_inner().map_advanced()
67    }
68
69    #[cfg(unix)]
70    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
71        let fd = self.inner.to_shared_fd();
72        let op = RecvVectored::new(fd, buf);
73        compio_runtime::submit(op).await.into_inner().map_advanced()
74    }
75}
76
77impl<T: AsRawFd + 'static> AsyncWrite for AsyncFd<T> {
78    #[inline]
79    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
80        (&*self).write(buf).await
81    }
82
83    #[cfg(unix)]
84    #[inline]
85    async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
86        (&*self).write_vectored(buf).await
87    }
88
89    #[inline]
90    async fn flush(&mut self) -> io::Result<()> {
91        (&*self).flush().await
92    }
93
94    #[inline]
95    async fn shutdown(&mut self) -> io::Result<()> {
96        (&*self).shutdown().await
97    }
98}
99
100impl<T: AsRawFd + 'static> AsyncWrite for &AsyncFd<T> {
101    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
102        let fd = self.inner.to_shared_fd();
103        let op = Send::new(fd, buf);
104        compio_runtime::submit(op).await.into_inner()
105    }
106
107    #[cfg(unix)]
108    async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
109        let fd = self.inner.to_shared_fd();
110        let op = SendVectored::new(fd, buf);
111        compio_runtime::submit(op).await.into_inner()
112    }
113
114    async fn flush(&mut self) -> io::Result<()> {
115        Ok(())
116    }
117
118    async fn shutdown(&mut self) -> io::Result<()> {
119        Ok(())
120    }
121}
122
123impl<T: AsRawFd> IntoInner for AsyncFd<T> {
124    type Inner = SharedFd<T>;
125
126    fn into_inner(self) -> Self::Inner {
127        self.inner.into_inner()
128    }
129}
130
131impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
132    fn as_raw_fd(&self) -> compio_driver::RawFd {
133        self.inner.as_raw_fd()
134    }
135}
136
137#[cfg(windows)]
138impl<T: AsRawFd + AsRawHandle> AsRawHandle for AsyncFd<T> {
139    fn as_raw_handle(&self) -> RawHandle {
140        self.inner.as_raw_handle()
141    }
142}
143
144#[cfg(windows)]
145impl<T: AsRawFd + AsRawSocket> AsRawSocket for AsyncFd<T> {
146    fn as_raw_socket(&self) -> RawSocket {
147        self.inner.as_raw_socket()
148    }
149}
150
151impl<T: AsRawFd> ToSharedFd<T> for AsyncFd<T> {
152    fn to_shared_fd(&self) -> SharedFd<T> {
153        self.inner.to_shared_fd()
154    }
155}
156
157impl<T: AsRawFd> Clone for AsyncFd<T> {
158    fn clone(&self) -> Self {
159        Self {
160            inner: self.inner.clone(),
161        }
162    }
163}
164
165#[cfg(unix)]
166impl<T: AsRawFd + FromRawFd> FromRawFd for AsyncFd<T> {
167    unsafe fn from_raw_fd(fd: RawFd) -> Self {
168        Self::new_unchecked(FromRawFd::from_raw_fd(fd))
169    }
170}
171
172#[cfg(windows)]
173impl<T: AsRawFd + FromRawHandle> FromRawHandle for AsyncFd<T> {
174    unsafe fn from_raw_handle(handle: RawHandle) -> Self {
175        Self::new_unchecked(FromRawHandle::from_raw_handle(handle))
176    }
177}
178
179#[cfg(windows)]
180impl<T: AsRawFd + FromRawSocket> FromRawSocket for AsyncFd<T> {
181    unsafe fn from_raw_socket(sock: RawSocket) -> Self {
182        Self::new_unchecked(FromRawSocket::from_raw_socket(sock))
183    }
184}