compio_driver/
op.rs

1//! The async operations.
2//!
3//! Types in this mod represents the low-level operations passed to kernel.
4//! The operation itself doesn't perform anything.
5//! You need to pass them to [`crate::Proactor`], and poll the driver.
6
7use std::{marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
10use socket2::SockAddr;
11
12#[cfg(windows)]
13pub use crate::sys::op::ConnectNamedPipe;
14pub use crate::sys::op::{
15    Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
16    SendToVectored, SendVectored,
17};
18#[cfg(unix)]
19pub use crate::sys::op::{
20    CreateDir, CreateSocket, FileStat, HardLink, Interest, OpenFile, PathStat, PollOnce,
21    ReadVectoredAt, Rename, Symlink, Unlink, WriteVectoredAt,
22};
23use crate::{
24    OwnedFd, SharedFd,
25    sys::{sockaddr_storage, socklen_t},
26};
27
28/// Trait to update the buffer length inside the [`BufResult`].
29pub trait BufResultExt {
30    /// Call [`SetBufInit::set_buf_init`] if the result is [`Ok`].
31    fn map_advanced(self) -> Self;
32}
33
34impl<T: SetBufInit> BufResultExt for BufResult<usize, T> {
35    fn map_advanced(self) -> Self {
36        self.map_res(|res| (res, ()))
37            .map_advanced()
38            .map_res(|(res, _)| res)
39    }
40}
41
42impl<T: SetBufInit, O> BufResultExt for BufResult<(usize, O), T> {
43    fn map_advanced(self) -> Self {
44        self.map(|(init, obj), mut buffer| {
45            unsafe {
46                buffer.set_buf_init(init);
47            }
48            ((init, obj), buffer)
49        })
50    }
51}
52
53impl<T: SetBufInit, C: SetBufInit, O> BufResultExt for BufResult<(usize, usize, O), (T, C)> {
54    fn map_advanced(self) -> Self {
55        self.map(
56            |(init_buffer, init_control, obj), (mut buffer, mut control)| {
57                unsafe {
58                    buffer.set_buf_init(init_buffer);
59                    control.set_buf_init(init_control);
60                }
61                ((init_buffer, init_control, obj), (buffer, control))
62            },
63        )
64    }
65}
66
67/// Helper trait for [`RecvFrom`], [`RecvFromVectored`] and [`RecvMsg`].
68pub trait RecvResultExt {
69    /// The mapped result.
70    type RecvResult;
71
72    /// Create [`SockAddr`] if the result is [`Ok`].
73    fn map_addr(self) -> Self::RecvResult;
74}
75
76impl<T> RecvResultExt for BufResult<usize, (T, sockaddr_storage, socklen_t)> {
77    type RecvResult = BufResult<(usize, SockAddr), T>;
78
79    fn map_addr(self) -> Self::RecvResult {
80        self.map_buffer(|(buffer, addr_buffer, addr_size)| (buffer, addr_buffer, addr_size, 0))
81            .map_addr()
82            .map_res(|(res, _, addr)| (res, addr))
83    }
84}
85
86impl<T> RecvResultExt for BufResult<usize, (T, sockaddr_storage, socklen_t, usize)> {
87    type RecvResult = BufResult<(usize, usize, SockAddr), T>;
88
89    fn map_addr(self) -> Self::RecvResult {
90        self.map2(
91            |res, (buffer, addr_buffer, addr_size, len)| {
92                let addr = unsafe { SockAddr::new(addr_buffer, addr_size) };
93                ((res, len, addr), buffer)
94            },
95            |(buffer, ..)| buffer,
96        )
97    }
98}
99
100/// Spawn a blocking function in the thread pool.
101pub struct Asyncify<F, D> {
102    pub(crate) f: Option<F>,
103    pub(crate) data: Option<D>,
104    _p: PhantomPinned,
105}
106
107impl<F, D> Asyncify<F, D> {
108    /// Create [`Asyncify`].
109    pub fn new(f: F) -> Self {
110        Self {
111            f: Some(f),
112            data: None,
113            _p: PhantomPinned,
114        }
115    }
116}
117
118impl<F, D> IntoInner for Asyncify<F, D> {
119    type Inner = D;
120
121    fn into_inner(mut self) -> Self::Inner {
122        self.data.take().expect("the data should not be None")
123    }
124}
125
126/// Close the file fd.
127pub struct CloseFile {
128    pub(crate) fd: ManuallyDrop<OwnedFd>,
129}
130
131impl CloseFile {
132    /// Create [`CloseFile`].
133    pub fn new(fd: OwnedFd) -> Self {
134        Self {
135            fd: ManuallyDrop::new(fd),
136        }
137    }
138}
139
140/// Read a file at specified position into specified buffer.
141#[derive(Debug)]
142pub struct ReadAt<T: IoBufMut, S> {
143    pub(crate) fd: SharedFd<S>,
144    pub(crate) offset: u64,
145    pub(crate) buffer: T,
146    #[cfg(aio)]
147    pub(crate) aiocb: libc::aiocb,
148    _p: PhantomPinned,
149}
150
151impl<T: IoBufMut, S> ReadAt<T, S> {
152    /// Create [`ReadAt`].
153    pub fn new(fd: SharedFd<S>, offset: u64, buffer: T) -> Self {
154        Self {
155            fd,
156            offset,
157            buffer,
158            #[cfg(aio)]
159            aiocb: unsafe { std::mem::zeroed() },
160            _p: PhantomPinned,
161        }
162    }
163}
164
165impl<T: IoBufMut, S> IntoInner for ReadAt<T, S> {
166    type Inner = T;
167
168    fn into_inner(self) -> Self::Inner {
169        self.buffer
170    }
171}
172
173/// Write a file at specified position from specified buffer.
174#[derive(Debug)]
175pub struct WriteAt<T: IoBuf, S> {
176    pub(crate) fd: SharedFd<S>,
177    pub(crate) offset: u64,
178    pub(crate) buffer: T,
179    #[cfg(aio)]
180    pub(crate) aiocb: libc::aiocb,
181    _p: PhantomPinned,
182}
183
184impl<T: IoBuf, S> WriteAt<T, S> {
185    /// Create [`WriteAt`].
186    pub fn new(fd: SharedFd<S>, offset: u64, buffer: T) -> Self {
187        Self {
188            fd,
189            offset,
190            buffer,
191            #[cfg(aio)]
192            aiocb: unsafe { std::mem::zeroed() },
193            _p: PhantomPinned,
194        }
195    }
196}
197
198impl<T: IoBuf, S> IntoInner for WriteAt<T, S> {
199    type Inner = T;
200
201    fn into_inner(self) -> Self::Inner {
202        self.buffer
203    }
204}
205
206/// Sync data to the disk.
207pub struct Sync<S> {
208    pub(crate) fd: SharedFd<S>,
209    #[allow(dead_code)]
210    pub(crate) datasync: bool,
211    #[cfg(aio)]
212    pub(crate) aiocb: libc::aiocb,
213}
214
215impl<S> Sync<S> {
216    /// Create [`Sync`].
217    ///
218    /// If `datasync` is `true`, the file metadata may not be synchronized.
219    pub fn new(fd: SharedFd<S>, datasync: bool) -> Self {
220        Self {
221            fd,
222            datasync,
223            #[cfg(aio)]
224            aiocb: unsafe { std::mem::zeroed() },
225        }
226    }
227}
228
229/// Shutdown a socket.
230pub struct ShutdownSocket<S> {
231    pub(crate) fd: SharedFd<S>,
232    pub(crate) how: Shutdown,
233}
234
235impl<S> ShutdownSocket<S> {
236    /// Create [`ShutdownSocket`].
237    pub fn new(fd: SharedFd<S>, how: Shutdown) -> Self {
238        Self { fd, how }
239    }
240}
241
242/// Close socket fd.
243pub struct CloseSocket {
244    pub(crate) fd: ManuallyDrop<OwnedFd>,
245}
246
247impl CloseSocket {
248    /// Create [`CloseSocket`].
249    pub fn new(fd: OwnedFd) -> Self {
250        Self {
251            fd: ManuallyDrop::new(fd),
252        }
253    }
254}
255
256/// Connect to a remote address.
257pub struct Connect<S> {
258    pub(crate) fd: SharedFd<S>,
259    pub(crate) addr: SockAddr,
260}
261
262impl<S> Connect<S> {
263    /// Create [`Connect`]. `fd` should be bound.
264    pub fn new(fd: SharedFd<S>, addr: SockAddr) -> Self {
265        Self { fd, addr }
266    }
267}