compio_driver/iour/
op.rs

1use std::{
2    ffi::CString,
3    io,
4    marker::PhantomPinned,
5    os::fd::{AsRawFd, FromRawFd, OwnedFd},
6    pin::Pin,
7};
8
9use compio_buf::{
10    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
11};
12use io_uring::{
13    opcode,
14    types::{Fd, FsyncFlags},
15};
16use libc::{sockaddr_storage, socklen_t};
17use socket2::SockAddr;
18
19use super::OpCode;
20pub use crate::unix::op::*;
21use crate::{OpEntry, SharedFd, op::*, syscall};
22
23impl<
24    D: std::marker::Send + 'static,
25    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
26> OpCode for Asyncify<F, D>
27{
28    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
29        OpEntry::Blocking
30    }
31
32    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
33        // Safety: self won't be moved
34        let this = unsafe { self.get_unchecked_mut() };
35        let f = this
36            .f
37            .take()
38            .expect("the operate method could only be called once");
39        let BufResult(res, data) = f();
40        this.data = Some(data);
41        res
42    }
43}
44
45impl OpCode for OpenFile {
46    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
47        opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
48            .flags(self.flags)
49            .mode(self.mode)
50            .build()
51            .into()
52    }
53}
54
55impl OpCode for CloseFile {
56    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
57        opcode::Close::new(Fd(self.fd.as_raw_fd())).build().into()
58    }
59}
60
61/// Get metadata of an opened file.
62pub struct FileStat<S> {
63    pub(crate) fd: SharedFd<S>,
64    pub(crate) stat: Statx,
65}
66
67impl<S> FileStat<S> {
68    /// Create [`FileStat`].
69    pub fn new(fd: SharedFd<S>) -> Self {
70        Self {
71            fd,
72            stat: unsafe { std::mem::zeroed() },
73        }
74    }
75}
76
77impl<S: AsRawFd> OpCode for FileStat<S> {
78    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
79        static EMPTY_NAME: &[u8] = b"\0";
80        opcode::Statx::new(
81            Fd(self.fd.as_raw_fd()),
82            EMPTY_NAME.as_ptr().cast(),
83            std::ptr::addr_of_mut!(self.stat).cast(),
84        )
85        .flags(libc::AT_EMPTY_PATH)
86        .build()
87        .into()
88    }
89}
90
91impl<S> IntoInner for FileStat<S> {
92    type Inner = libc::stat;
93
94    fn into_inner(self) -> Self::Inner {
95        statx_to_stat(self.stat)
96    }
97}
98
99/// Get metadata from path.
100pub struct PathStat {
101    pub(crate) path: CString,
102    pub(crate) stat: Statx,
103    pub(crate) follow_symlink: bool,
104}
105
106impl PathStat {
107    /// Create [`PathStat`].
108    pub fn new(path: CString, follow_symlink: bool) -> Self {
109        Self {
110            path,
111            stat: unsafe { std::mem::zeroed() },
112            follow_symlink,
113        }
114    }
115}
116
117impl OpCode for PathStat {
118    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
119        let mut flags = libc::AT_EMPTY_PATH;
120        if !self.follow_symlink {
121            flags |= libc::AT_SYMLINK_NOFOLLOW;
122        }
123        opcode::Statx::new(
124            Fd(libc::AT_FDCWD),
125            self.path.as_ptr(),
126            std::ptr::addr_of_mut!(self.stat).cast(),
127        )
128        .flags(flags)
129        .build()
130        .into()
131    }
132}
133
134impl IntoInner for PathStat {
135    type Inner = libc::stat;
136
137    fn into_inner(self) -> Self::Inner {
138        statx_to_stat(self.stat)
139    }
140}
141
142impl<T: IoBufMut, S: AsRawFd> OpCode for ReadAt<T, S> {
143    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
144        let fd = Fd(self.fd.as_raw_fd());
145        let offset = self.offset;
146        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
147        opcode::Read::new(fd, slice.as_mut_ptr() as _, slice.len() as _)
148            .offset(offset)
149            .build()
150            .into()
151    }
152}
153
154impl<T: IoVectoredBufMut, S: AsRawFd> OpCode for ReadVectoredAt<T, S> {
155    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
156        let this = unsafe { self.get_unchecked_mut() };
157        this.slices = unsafe { this.buffer.io_slices_mut() };
158        opcode::Readv::new(
159            Fd(this.fd.as_raw_fd()),
160            this.slices.as_ptr() as _,
161            this.slices.len() as _,
162        )
163        .offset(this.offset)
164        .build()
165        .into()
166    }
167}
168
169impl<T: IoBuf, S: AsRawFd> OpCode for WriteAt<T, S> {
170    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
171        let slice = self.buffer.as_slice();
172        opcode::Write::new(Fd(self.fd.as_raw_fd()), slice.as_ptr(), slice.len() as _)
173            .offset(self.offset)
174            .build()
175            .into()
176    }
177}
178
179impl<T: IoVectoredBuf, S: AsRawFd> OpCode for WriteVectoredAt<T, S> {
180    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
181        let this = unsafe { self.get_unchecked_mut() };
182        this.slices = unsafe { this.buffer.io_slices() };
183        opcode::Writev::new(
184            Fd(this.fd.as_raw_fd()),
185            this.slices.as_ptr() as _,
186            this.slices.len() as _,
187        )
188        .offset(this.offset)
189        .build()
190        .into()
191    }
192}
193
194impl<S: AsRawFd> OpCode for Sync<S> {
195    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
196        opcode::Fsync::new(Fd(self.fd.as_raw_fd()))
197            .flags(if self.datasync {
198                FsyncFlags::DATASYNC
199            } else {
200                FsyncFlags::empty()
201            })
202            .build()
203            .into()
204    }
205}
206
207impl OpCode for Unlink {
208    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
209        opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
210            .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
211            .build()
212            .into()
213    }
214}
215
216impl OpCode for CreateDir {
217    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
218        opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
219            .mode(self.mode)
220            .build()
221            .into()
222    }
223}
224
225impl OpCode for Rename {
226    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
227        opcode::RenameAt::new(
228            Fd(libc::AT_FDCWD),
229            self.old_path.as_ptr(),
230            Fd(libc::AT_FDCWD),
231            self.new_path.as_ptr(),
232        )
233        .build()
234        .into()
235    }
236}
237
238impl OpCode for Symlink {
239    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
240        opcode::SymlinkAt::new(
241            Fd(libc::AT_FDCWD),
242            self.source.as_ptr(),
243            self.target.as_ptr(),
244        )
245        .build()
246        .into()
247    }
248}
249
250impl OpCode for HardLink {
251    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
252        opcode::LinkAt::new(
253            Fd(libc::AT_FDCWD),
254            self.source.as_ptr(),
255            Fd(libc::AT_FDCWD),
256            self.target.as_ptr(),
257        )
258        .build()
259        .into()
260    }
261}
262
263impl OpCode for CreateSocket {
264    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
265        if cfg!(feature = "io-uring-socket") {
266            opcode::Socket::new(self.domain, self.socket_type, self.protocol)
267                .build()
268                .into()
269        } else {
270            OpEntry::Blocking
271        }
272    }
273
274    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
275        Ok(syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _)
276    }
277}
278
279impl<S: AsRawFd> OpCode for ShutdownSocket<S> {
280    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
281        opcode::Shutdown::new(Fd(self.fd.as_raw_fd()), self.how())
282            .build()
283            .into()
284    }
285}
286
287impl OpCode for CloseSocket {
288    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
289        opcode::Close::new(Fd(self.fd.as_raw_fd())).build().into()
290    }
291}
292
293impl<S: AsRawFd> OpCode for Accept<S> {
294    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
295        let this = unsafe { self.get_unchecked_mut() };
296        opcode::Accept::new(
297            Fd(this.fd.as_raw_fd()),
298            &mut this.buffer as *mut sockaddr_storage as *mut libc::sockaddr,
299            &mut this.addr_len,
300        )
301        .build()
302        .into()
303    }
304
305    unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
306        self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
307    }
308}
309
310impl<S: AsRawFd> OpCode for Connect<S> {
311    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
312        opcode::Connect::new(Fd(self.fd.as_raw_fd()), self.addr.as_ptr(), self.addr.len())
313            .build()
314            .into()
315    }
316}
317
318impl<T: IoBufMut, S: AsRawFd> OpCode for Recv<T, S> {
319    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
320        let fd = self.fd.as_raw_fd();
321        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
322        opcode::Read::new(Fd(fd), slice.as_mut_ptr() as _, slice.len() as _)
323            .build()
324            .into()
325    }
326}
327
328impl<T: IoVectoredBufMut, S: AsRawFd> OpCode for RecvVectored<T, S> {
329    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
330        let this = unsafe { self.get_unchecked_mut() };
331        this.slices = unsafe { this.buffer.io_slices_mut() };
332        opcode::Readv::new(
333            Fd(this.fd.as_raw_fd()),
334            this.slices.as_ptr() as _,
335            this.slices.len() as _,
336        )
337        .build()
338        .into()
339    }
340}
341
342impl<T: IoBuf, S: AsRawFd> OpCode for Send<T, S> {
343    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
344        let slice = self.buffer.as_slice();
345        opcode::Write::new(Fd(self.fd.as_raw_fd()), slice.as_ptr(), slice.len() as _)
346            .build()
347            .into()
348    }
349}
350
351impl<T: IoVectoredBuf, S: AsRawFd> OpCode for SendVectored<T, S> {
352    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
353        let this = unsafe { self.get_unchecked_mut() };
354        this.slices = unsafe { this.buffer.io_slices() };
355        opcode::Writev::new(
356            Fd(this.fd.as_raw_fd()),
357            this.slices.as_ptr() as _,
358            this.slices.len() as _,
359        )
360        .build()
361        .into()
362    }
363}
364
365struct RecvFromHeader<S> {
366    pub(crate) fd: SharedFd<S>,
367    pub(crate) addr: sockaddr_storage,
368    pub(crate) msg: libc::msghdr,
369    _p: PhantomPinned,
370}
371
372impl<S> RecvFromHeader<S> {
373    pub fn new(fd: SharedFd<S>) -> Self {
374        Self {
375            fd,
376            addr: unsafe { std::mem::zeroed() },
377            msg: unsafe { std::mem::zeroed() },
378            _p: PhantomPinned,
379        }
380    }
381}
382
383impl<S: AsRawFd> RecvFromHeader<S> {
384    pub fn create_entry(&mut self, slices: &mut [IoSliceMut]) -> OpEntry {
385        self.msg.msg_name = &mut self.addr as *mut _ as _;
386        self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
387        self.msg.msg_iov = slices.as_mut_ptr() as _;
388        self.msg.msg_iovlen = slices.len() as _;
389        opcode::RecvMsg::new(Fd(self.fd.as_raw_fd()), &mut self.msg)
390            .build()
391            .into()
392    }
393
394    pub fn into_addr(self) -> (sockaddr_storage, socklen_t) {
395        (self.addr, self.msg.msg_namelen)
396    }
397}
398
399/// Receive data and source address.
400pub struct RecvFrom<T: IoBufMut, S> {
401    header: RecvFromHeader<S>,
402    buffer: T,
403    slice: [IoSliceMut; 1],
404}
405
406impl<T: IoBufMut, S> RecvFrom<T, S> {
407    /// Create [`RecvFrom`].
408    pub fn new(fd: SharedFd<S>, buffer: T) -> Self {
409        Self {
410            header: RecvFromHeader::new(fd),
411            buffer,
412            // SAFETY: We never use this slice.
413            slice: [unsafe { IoSliceMut::from_slice(&mut []) }],
414        }
415    }
416}
417
418impl<T: IoBufMut, S: AsRawFd> OpCode for RecvFrom<T, S> {
419    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
420        let this = unsafe { self.get_unchecked_mut() };
421        this.slice[0] = unsafe { this.buffer.as_io_slice_mut() };
422        this.header.create_entry(&mut this.slice)
423    }
424}
425
426impl<T: IoBufMut, S: AsRawFd> IntoInner for RecvFrom<T, S> {
427    type Inner = (T, sockaddr_storage, socklen_t);
428
429    fn into_inner(self) -> Self::Inner {
430        let (addr, addr_len) = self.header.into_addr();
431        (self.buffer, addr, addr_len)
432    }
433}
434
435/// Receive data and source address into vectored buffer.
436pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
437    header: RecvFromHeader<S>,
438    buffer: T,
439    slice: Vec<IoSliceMut>,
440}
441
442impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
443    /// Create [`RecvFromVectored`].
444    pub fn new(fd: SharedFd<S>, buffer: T) -> Self {
445        Self {
446            header: RecvFromHeader::new(fd),
447            buffer,
448            slice: vec![],
449        }
450    }
451}
452
453impl<T: IoVectoredBufMut, S: AsRawFd> OpCode for RecvFromVectored<T, S> {
454    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
455        let this = unsafe { self.get_unchecked_mut() };
456        this.slice = unsafe { this.buffer.io_slices_mut() };
457        this.header.create_entry(&mut this.slice)
458    }
459}
460
461impl<T: IoVectoredBufMut, S: AsRawFd> IntoInner for RecvFromVectored<T, S> {
462    type Inner = (T, sockaddr_storage, socklen_t);
463
464    fn into_inner(self) -> Self::Inner {
465        let (addr, addr_len) = self.header.into_addr();
466        (self.buffer, addr, addr_len)
467    }
468}
469
470struct SendToHeader<S> {
471    pub(crate) fd: SharedFd<S>,
472    pub(crate) addr: SockAddr,
473    pub(crate) msg: libc::msghdr,
474    _p: PhantomPinned,
475}
476
477impl<S> SendToHeader<S> {
478    pub fn new(fd: SharedFd<S>, addr: SockAddr) -> Self {
479        Self {
480            fd,
481            addr,
482            msg: unsafe { std::mem::zeroed() },
483            _p: PhantomPinned,
484        }
485    }
486}
487
488impl<S: AsRawFd> SendToHeader<S> {
489    pub fn create_entry(&mut self, slices: &mut [IoSlice]) -> OpEntry {
490        self.msg.msg_name = self.addr.as_ptr() as _;
491        self.msg.msg_namelen = self.addr.len();
492        self.msg.msg_iov = slices.as_mut_ptr() as _;
493        self.msg.msg_iovlen = slices.len() as _;
494        opcode::SendMsg::new(Fd(self.fd.as_raw_fd()), &self.msg)
495            .build()
496            .into()
497    }
498}
499
500/// Send data to specified address.
501pub struct SendTo<T: IoBuf, S> {
502    header: SendToHeader<S>,
503    buffer: T,
504    slice: [IoSlice; 1],
505}
506
507impl<T: IoBuf, S> SendTo<T, S> {
508    /// Create [`SendTo`].
509    pub fn new(fd: SharedFd<S>, buffer: T, addr: SockAddr) -> Self {
510        Self {
511            header: SendToHeader::new(fd, addr),
512            buffer,
513            // SAFETY: We never use this slice.
514            slice: [unsafe { IoSlice::from_slice(&[]) }],
515        }
516    }
517}
518
519impl<T: IoBuf, S: AsRawFd> OpCode for SendTo<T, S> {
520    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
521        let this = unsafe { self.get_unchecked_mut() };
522        this.slice[0] = unsafe { this.buffer.as_io_slice() };
523        this.header.create_entry(&mut this.slice)
524    }
525}
526
527impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
528    type Inner = T;
529
530    fn into_inner(self) -> Self::Inner {
531        self.buffer
532    }
533}
534
535/// Send data to specified address from vectored buffer.
536pub struct SendToVectored<T: IoVectoredBuf, S> {
537    header: SendToHeader<S>,
538    buffer: T,
539    slice: Vec<IoSlice>,
540}
541
542impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
543    /// Create [`SendToVectored`].
544    pub fn new(fd: SharedFd<S>, buffer: T, addr: SockAddr) -> Self {
545        Self {
546            header: SendToHeader::new(fd, addr),
547            buffer,
548            slice: vec![],
549        }
550    }
551}
552
553impl<T: IoVectoredBuf, S: AsRawFd> OpCode for SendToVectored<T, S> {
554    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
555        let this = unsafe { self.get_unchecked_mut() };
556        this.slice = unsafe { this.buffer.io_slices() };
557        this.header.create_entry(&mut this.slice)
558    }
559}
560
561impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
562    type Inner = T;
563
564    fn into_inner(self) -> Self::Inner {
565        self.buffer
566    }
567}
568
569impl<T: IoVectoredBufMut, C: IoBufMut, S: AsRawFd> OpCode for RecvMsg<T, C, S> {
570    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
571        let this = unsafe { self.get_unchecked_mut() };
572        unsafe { this.set_msg() };
573        opcode::RecvMsg::new(Fd(this.fd.as_raw_fd()), &mut this.msg)
574            .build()
575            .into()
576    }
577}
578
579impl<T: IoVectoredBuf, C: IoBuf, S: AsRawFd> OpCode for SendMsg<T, C, S> {
580    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
581        let this = unsafe { self.get_unchecked_mut() };
582        unsafe { this.set_msg() };
583        opcode::SendMsg::new(Fd(this.fd.as_raw_fd()), &this.msg)
584            .build()
585            .into()
586    }
587}
588
589impl<S: AsRawFd> OpCode for PollOnce<S> {
590    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
591        let flags = match self.interest {
592            Interest::Readable => libc::POLLIN,
593            Interest::Writable => libc::POLLOUT,
594        };
595        opcode::PollAdd::new(Fd(self.fd.as_raw_fd()), flags as _)
596            .build()
597            .into()
598    }
599}