compio_driver/poll/
op.rs

1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4    ffi::CString,
5    io,
6    marker::PhantomPinned,
7    os::fd::{FromRawFd, OwnedFd},
8    pin::Pin,
9    task::Poll,
10};
11
12use compio_buf::{
13    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
14};
15#[cfg(not(gnulinux))]
16use libc::open;
17#[cfg(gnulinux)]
18use libc::open64 as open;
19#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
20use libc::{pread, preadv, pwrite, pwritev};
21#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
22use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
23use socket2::SockAddr;
24
25use super::{AsRawFd, Decision, OpCode, OpType, sockaddr_storage, socklen_t, syscall};
26pub use crate::unix::op::*;
27use crate::{SharedFd, op::*};
28
29impl<
30    D: std::marker::Send + 'static,
31    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
32> OpCode for Asyncify<F, D>
33{
34    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
35        Ok(Decision::Blocking)
36    }
37
38    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
39        // Safety: self won't be moved
40        let this = unsafe { self.get_unchecked_mut() };
41        let f = this
42            .f
43            .take()
44            .expect("the operate method could only be called once");
45        let BufResult(res, data) = f();
46        this.data = Some(data);
47        Poll::Ready(res)
48    }
49}
50
51impl OpCode for OpenFile {
52    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
53        Ok(Decision::Blocking)
54    }
55
56    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
57        Poll::Ready(Ok(syscall!(open(
58            self.path.as_ptr(),
59            self.flags,
60            self.mode as libc::c_int
61        ))? as _))
62    }
63}
64
65impl OpCode for CloseFile {
66    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
67        Ok(Decision::Blocking)
68    }
69
70    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
71        Poll::Ready(Ok(syscall!(libc::close(self.fd.as_raw_fd()))? as _))
72    }
73}
74
75/// Get metadata of an opened file.
76pub struct FileStat<S> {
77    pub(crate) fd: SharedFd<S>,
78    pub(crate) stat: libc::stat,
79}
80
81impl<S> FileStat<S> {
82    /// Create [`FileStat`].
83    pub fn new(fd: SharedFd<S>) -> Self {
84        Self {
85            fd,
86            stat: unsafe { std::mem::zeroed() },
87        }
88    }
89}
90
91impl<S: AsRawFd> OpCode for FileStat<S> {
92    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
93        Ok(Decision::Blocking)
94    }
95
96    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
97        #[cfg(gnulinux)]
98        {
99            let mut s: libc::statx = unsafe { std::mem::zeroed() };
100            static EMPTY_NAME: &[u8] = b"\0";
101            syscall!(libc::statx(
102                self.fd.as_raw_fd(),
103                EMPTY_NAME.as_ptr().cast(),
104                libc::AT_EMPTY_PATH,
105                0,
106                &mut s
107            ))?;
108            self.stat = statx_to_stat(s);
109            Poll::Ready(Ok(0))
110        }
111        #[cfg(not(gnulinux))]
112        {
113            Poll::Ready(Ok(
114                syscall!(libc::fstat(self.fd.as_raw_fd(), &mut self.stat))? as _,
115            ))
116        }
117    }
118}
119
120impl<S> IntoInner for FileStat<S> {
121    type Inner = libc::stat;
122
123    fn into_inner(self) -> Self::Inner {
124        self.stat
125    }
126}
127
128/// Get metadata from path.
129pub struct PathStat {
130    pub(crate) path: CString,
131    pub(crate) stat: libc::stat,
132    pub(crate) follow_symlink: bool,
133}
134
135impl PathStat {
136    /// Create [`PathStat`].
137    pub fn new(path: CString, follow_symlink: bool) -> Self {
138        Self {
139            path,
140            stat: unsafe { std::mem::zeroed() },
141            follow_symlink,
142        }
143    }
144}
145
146impl OpCode for PathStat {
147    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
148        Ok(Decision::Blocking)
149    }
150
151    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
152        #[cfg(gnulinux)]
153        {
154            let mut flags = libc::AT_EMPTY_PATH;
155            if !self.follow_symlink {
156                flags |= libc::AT_SYMLINK_NOFOLLOW;
157            }
158            let mut s: libc::statx = unsafe { std::mem::zeroed() };
159            syscall!(libc::statx(
160                libc::AT_FDCWD,
161                self.path.as_ptr(),
162                flags,
163                0,
164                &mut s
165            ))?;
166            self.stat = statx_to_stat(s);
167            Poll::Ready(Ok(0))
168        }
169        #[cfg(not(gnulinux))]
170        {
171            let f = if self.follow_symlink {
172                libc::stat
173            } else {
174                libc::lstat
175            };
176            Poll::Ready(Ok(syscall!(f(self.path.as_ptr(), &mut self.stat))? as _))
177        }
178    }
179}
180
181impl IntoInner for PathStat {
182    type Inner = libc::stat;
183
184    fn into_inner(self) -> Self::Inner {
185        self.stat
186    }
187}
188
189impl<T: IoBufMut, S: AsRawFd> OpCode for ReadAt<T, S> {
190    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
191        #[cfg(aio)]
192        {
193            let this = unsafe { self.get_unchecked_mut() };
194            let slice = this.buffer.as_mut_slice();
195
196            this.aiocb.aio_fildes = this.fd.as_raw_fd();
197            this.aiocb.aio_offset = this.offset as _;
198            this.aiocb.aio_buf = slice.as_mut_ptr().cast();
199            this.aiocb.aio_nbytes = slice.len();
200
201            Ok(Decision::aio(&mut this.aiocb, libc::aio_read))
202        }
203        #[cfg(not(aio))]
204        {
205            Ok(Decision::Blocking)
206        }
207    }
208
209    #[cfg(aio)]
210    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
211        Some(OpType::Aio(NonNull::from(
212            &mut unsafe { self.get_unchecked_mut() }.aiocb,
213        )))
214    }
215
216    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
217        let fd = self.fd.as_raw_fd();
218        let offset = self.offset;
219        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
220        syscall!(break pread(fd, slice.as_mut_ptr() as _, slice.len() as _, offset as _,))
221    }
222}
223
224impl<T: IoVectoredBufMut, S: AsRawFd> OpCode for ReadVectoredAt<T, S> {
225    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
226        #[cfg(freebsd)]
227        {
228            let this = unsafe { self.get_unchecked_mut() };
229            this.slices = unsafe { this.buffer.io_slices_mut() };
230
231            this.aiocb.aio_fildes = this.fd.as_raw_fd();
232            this.aiocb.aio_offset = this.offset as _;
233            this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
234            this.aiocb.aio_nbytes = this.slices.len();
235
236            Ok(Decision::aio(&mut this.aiocb, libc::aio_readv))
237        }
238        #[cfg(not(freebsd))]
239        {
240            Ok(Decision::Blocking)
241        }
242    }
243
244    #[cfg(freebsd)]
245    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
246        Some(OpType::Aio(NonNull::from(
247            &mut unsafe { self.get_unchecked_mut() }.aiocb,
248        )))
249    }
250
251    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
252        let this = unsafe { self.get_unchecked_mut() };
253        this.slices = unsafe { this.buffer.io_slices_mut() };
254        syscall!(
255            break preadv(
256                this.fd.as_raw_fd(),
257                this.slices.as_ptr() as _,
258                this.slices.len() as _,
259                this.offset as _,
260            )
261        )
262    }
263}
264
265impl<T: IoBuf, S: AsRawFd> OpCode for WriteAt<T, S> {
266    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
267        #[cfg(aio)]
268        {
269            let this = unsafe { self.get_unchecked_mut() };
270            let slice = this.buffer.as_slice();
271
272            this.aiocb.aio_fildes = this.fd.as_raw_fd();
273            this.aiocb.aio_offset = this.offset as _;
274            this.aiocb.aio_buf = slice.as_ptr().cast_mut().cast();
275            this.aiocb.aio_nbytes = slice.len();
276
277            Ok(Decision::aio(&mut this.aiocb, libc::aio_write))
278        }
279        #[cfg(not(aio))]
280        {
281            Ok(Decision::Blocking)
282        }
283    }
284
285    #[cfg(aio)]
286    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
287        Some(OpType::Aio(NonNull::from(
288            &mut unsafe { self.get_unchecked_mut() }.aiocb,
289        )))
290    }
291
292    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
293        let slice = self.buffer.as_slice();
294        syscall!(
295            break pwrite(
296                self.fd.as_raw_fd(),
297                slice.as_ptr() as _,
298                slice.len() as _,
299                self.offset as _,
300            )
301        )
302    }
303}
304
305impl<T: IoVectoredBuf, S: AsRawFd> OpCode for WriteVectoredAt<T, S> {
306    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
307        #[cfg(freebsd)]
308        {
309            let this = unsafe { self.get_unchecked_mut() };
310            this.slices = unsafe { this.buffer.io_slices() };
311
312            this.aiocb.aio_fildes = this.fd.as_raw_fd();
313            this.aiocb.aio_offset = this.offset as _;
314            this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
315            this.aiocb.aio_nbytes = this.slices.len();
316
317            Ok(Decision::aio(&mut this.aiocb, libc::aio_writev))
318        }
319        #[cfg(not(freebsd))]
320        {
321            Ok(Decision::Blocking)
322        }
323    }
324
325    #[cfg(freebsd)]
326    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
327        Some(OpType::Aio(NonNull::from(
328            &mut unsafe { self.get_unchecked_mut() }.aiocb,
329        )))
330    }
331
332    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
333        let this = unsafe { self.get_unchecked_mut() };
334        this.slices = unsafe { this.buffer.io_slices() };
335        syscall!(
336            break pwritev(
337                this.fd.as_raw_fd(),
338                this.slices.as_ptr() as _,
339                this.slices.len() as _,
340                this.offset as _,
341            )
342        )
343    }
344}
345
346impl<S: AsRawFd> OpCode for Sync<S> {
347    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
348        #[cfg(aio)]
349        {
350            unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
351                libc::aio_fsync(libc::O_SYNC, aiocbp)
352            }
353            unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
354                libc::aio_fsync(libc::O_DSYNC, aiocbp)
355            }
356
357            let this = unsafe { self.get_unchecked_mut() };
358            this.aiocb.aio_fildes = this.fd.as_raw_fd();
359
360            let f = if this.datasync {
361                aio_fdatasync
362            } else {
363                aio_fsync
364            };
365
366            Ok(Decision::aio(&mut this.aiocb, f))
367        }
368        #[cfg(not(aio))]
369        {
370            Ok(Decision::Blocking)
371        }
372    }
373
374    #[cfg(aio)]
375    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
376        Some(OpType::Aio(NonNull::from(
377            &mut unsafe { self.get_unchecked_mut() }.aiocb,
378        )))
379    }
380
381    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
382        #[cfg(datasync)]
383        {
384            Poll::Ready(Ok(syscall!(if self.datasync {
385                libc::fdatasync(self.fd.as_raw_fd())
386            } else {
387                libc::fsync(self.fd.as_raw_fd())
388            })? as _))
389        }
390        #[cfg(not(datasync))]
391        {
392            Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_raw_fd()))? as _))
393        }
394    }
395}
396
397impl OpCode for Unlink {
398    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
399        Ok(Decision::Blocking)
400    }
401
402    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
403        if self.dir {
404            syscall!(libc::rmdir(self.path.as_ptr()))?;
405        } else {
406            syscall!(libc::unlink(self.path.as_ptr()))?;
407        }
408        Poll::Ready(Ok(0))
409    }
410}
411
412impl OpCode for CreateDir {
413    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
414        Ok(Decision::Blocking)
415    }
416
417    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
418        syscall!(libc::mkdir(self.path.as_ptr(), self.mode))?;
419        Poll::Ready(Ok(0))
420    }
421}
422
423impl OpCode for Rename {
424    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
425        Ok(Decision::Blocking)
426    }
427
428    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
429        syscall!(libc::rename(self.old_path.as_ptr(), self.new_path.as_ptr()))?;
430        Poll::Ready(Ok(0))
431    }
432}
433
434impl OpCode for Symlink {
435    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
436        Ok(Decision::Blocking)
437    }
438
439    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
440        syscall!(libc::symlink(self.source.as_ptr(), self.target.as_ptr()))?;
441        Poll::Ready(Ok(0))
442    }
443}
444
445impl OpCode for HardLink {
446    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
447        Ok(Decision::Blocking)
448    }
449
450    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
451        syscall!(libc::link(self.source.as_ptr(), self.target.as_ptr()))?;
452        Poll::Ready(Ok(0))
453    }
454}
455
456impl OpCode for CreateSocket {
457    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
458        Ok(Decision::Blocking)
459    }
460
461    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
462        Poll::Ready(Ok(
463            syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _,
464        ))
465    }
466}
467
468impl<S: AsRawFd> OpCode for ShutdownSocket<S> {
469    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
470        Ok(Decision::Blocking)
471    }
472
473    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
474        Poll::Ready(Ok(
475            syscall!(libc::shutdown(self.fd.as_raw_fd(), self.how()))? as _,
476        ))
477    }
478}
479
480impl OpCode for CloseSocket {
481    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
482        Ok(Decision::Blocking)
483    }
484
485    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
486        Poll::Ready(Ok(syscall!(libc::close(self.fd.as_raw_fd()))? as _))
487    }
488}
489
490impl<S: AsRawFd> Accept<S> {
491    unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
492        let this = self.get_unchecked_mut();
493        libc::accept(
494            this.fd.as_raw_fd(),
495            &mut this.buffer as *mut _ as *mut _,
496            &mut this.addr_len,
497        )
498    }
499}
500
501impl<S: AsRawFd> OpCode for Accept<S> {
502    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
503        let fd = self.fd.as_raw_fd();
504        syscall!(self.as_mut().call(), wait_readable(fd))
505    }
506
507    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
508        Some(OpType::Fd(self.fd.as_raw_fd()))
509    }
510
511    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
512        let res = syscall!(break self.as_mut().call());
513        if let Poll::Ready(Ok(fd)) = res {
514            unsafe {
515                self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
516            }
517        }
518        res
519    }
520}
521
522impl<S: AsRawFd> OpCode for Connect<S> {
523    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
524        syscall!(
525            libc::connect(self.fd.as_raw_fd(), self.addr.as_ptr(), self.addr.len()),
526            wait_writable(self.fd.as_raw_fd())
527        )
528    }
529
530    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
531        Some(OpType::Fd(self.fd.as_raw_fd()))
532    }
533
534    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
535        let mut err: libc::c_int = 0;
536        let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
537
538        syscall!(libc::getsockopt(
539            self.fd.as_raw_fd(),
540            libc::SOL_SOCKET,
541            libc::SO_ERROR,
542            &mut err as *mut _ as *mut _,
543            &mut err_len
544        ))?;
545
546        let res = if err == 0 {
547            Ok(0)
548        } else {
549            Err(io::Error::from_raw_os_error(err))
550        };
551        Poll::Ready(res)
552    }
553}
554
555impl<T: IoBufMut, S: AsRawFd> OpCode for Recv<T, S> {
556    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
557        Ok(Decision::wait_readable(self.fd.as_raw_fd()))
558    }
559
560    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
561        Some(OpType::Fd(self.fd.as_raw_fd()))
562    }
563
564    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
565        let fd = self.fd.as_raw_fd();
566        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
567        syscall!(break libc::read(fd, slice.as_mut_ptr() as _, slice.len()))
568    }
569}
570
571impl<T: IoVectoredBufMut, S: AsRawFd> OpCode for RecvVectored<T, S> {
572    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
573        Ok(Decision::wait_readable(self.fd.as_raw_fd()))
574    }
575
576    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
577        Some(OpType::Fd(self.fd.as_raw_fd()))
578    }
579
580    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
581        let this = unsafe { self.get_unchecked_mut() };
582        this.slices = unsafe { this.buffer.io_slices_mut() };
583        syscall!(
584            break libc::readv(
585                this.fd.as_raw_fd(),
586                this.slices.as_ptr() as _,
587                this.slices.len() as _
588            )
589        )
590    }
591}
592
593impl<T: IoBuf, S: AsRawFd> OpCode for Send<T, S> {
594    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
595        Ok(Decision::wait_writable(self.fd.as_raw_fd()))
596    }
597
598    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
599        Some(OpType::Fd(self.fd.as_raw_fd()))
600    }
601
602    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
603        let slice = self.buffer.as_slice();
604        syscall!(break libc::write(self.fd.as_raw_fd(), slice.as_ptr() as _, slice.len()))
605    }
606}
607
608impl<T: IoVectoredBuf, S: AsRawFd> OpCode for SendVectored<T, S> {
609    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
610        Ok(Decision::wait_writable(self.fd.as_raw_fd()))
611    }
612
613    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
614        Some(OpType::Fd(self.fd.as_raw_fd()))
615    }
616
617    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
618        let this = unsafe { self.get_unchecked_mut() };
619        this.slices = unsafe { this.buffer.io_slices() };
620        syscall!(
621            break libc::writev(
622                this.fd.as_raw_fd(),
623                this.slices.as_ptr() as _,
624                this.slices.len() as _
625            )
626        )
627    }
628}
629
630/// Receive data and source address.
631pub struct RecvFrom<T: IoBufMut, S> {
632    pub(crate) fd: SharedFd<S>,
633    pub(crate) buffer: T,
634    pub(crate) addr: sockaddr_storage,
635    pub(crate) addr_len: socklen_t,
636    _p: PhantomPinned,
637}
638
639impl<T: IoBufMut, S> RecvFrom<T, S> {
640    /// Create [`RecvFrom`].
641    pub fn new(fd: SharedFd<S>, buffer: T) -> Self {
642        Self {
643            fd,
644            buffer,
645            addr: unsafe { std::mem::zeroed() },
646            addr_len: std::mem::size_of::<sockaddr_storage>() as _,
647            _p: PhantomPinned,
648        }
649    }
650}
651
652impl<T: IoBufMut, S: AsRawFd> RecvFrom<T, S> {
653    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
654        let this = self.get_unchecked_mut();
655        let fd = this.fd.as_raw_fd();
656        let slice = this.buffer.as_mut_slice();
657        libc::recvfrom(
658            fd,
659            slice.as_mut_ptr() as _,
660            slice.len(),
661            0,
662            &mut this.addr as *mut _ as _,
663            &mut this.addr_len,
664        )
665    }
666}
667
668impl<T: IoBufMut, S: AsRawFd> OpCode for RecvFrom<T, S> {
669    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
670        let fd = self.fd.as_raw_fd();
671        syscall!(self.as_mut().call(), wait_readable(fd))
672    }
673
674    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
675        Some(OpType::Fd(self.fd.as_raw_fd()))
676    }
677
678    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
679        syscall!(break self.as_mut().call())
680    }
681}
682
683impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
684    type Inner = (T, sockaddr_storage, socklen_t);
685
686    fn into_inner(self) -> Self::Inner {
687        (self.buffer, self.addr, self.addr_len)
688    }
689}
690
691/// Receive data and source address into vectored buffer.
692pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
693    pub(crate) fd: SharedFd<S>,
694    pub(crate) buffer: T,
695    pub(crate) slices: Vec<IoSliceMut>,
696    pub(crate) addr: sockaddr_storage,
697    pub(crate) msg: libc::msghdr,
698    _p: PhantomPinned,
699}
700
701impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
702    /// Create [`RecvFromVectored`].
703    pub fn new(fd: SharedFd<S>, buffer: T) -> Self {
704        Self {
705            fd,
706            buffer,
707            slices: vec![],
708            addr: unsafe { std::mem::zeroed() },
709            msg: unsafe { std::mem::zeroed() },
710            _p: PhantomPinned,
711        }
712    }
713}
714
715impl<T: IoVectoredBufMut, S: AsRawFd> RecvFromVectored<T, S> {
716    fn set_msg(&mut self) {
717        self.slices = unsafe { self.buffer.io_slices_mut() };
718        self.msg = libc::msghdr {
719            msg_name: &mut self.addr as *mut _ as _,
720            msg_namelen: std::mem::size_of_val(&self.addr) as _,
721            msg_iov: self.slices.as_mut_ptr() as _,
722            msg_iovlen: self.slices.len() as _,
723            msg_control: std::ptr::null_mut(),
724            msg_controllen: 0,
725            msg_flags: 0,
726        };
727    }
728
729    unsafe fn call(&mut self) -> libc::ssize_t {
730        libc::recvmsg(self.fd.as_raw_fd(), &mut self.msg, 0)
731    }
732}
733
734impl<T: IoVectoredBufMut, S: AsRawFd> OpCode for RecvFromVectored<T, S> {
735    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
736        let this = unsafe { self.get_unchecked_mut() };
737        this.set_msg();
738        syscall!(this.call(), wait_readable(this.fd.as_raw_fd()))
739    }
740
741    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
742        Some(OpType::Fd(self.fd.as_raw_fd()))
743    }
744
745    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
746        let this = unsafe { self.get_unchecked_mut() };
747        syscall!(break this.call())
748    }
749}
750
751impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
752    type Inner = (T, sockaddr_storage, socklen_t);
753
754    fn into_inner(self) -> Self::Inner {
755        (self.buffer, self.addr, self.msg.msg_namelen)
756    }
757}
758
759/// Send data to specified address.
760pub struct SendTo<T: IoBuf, S> {
761    pub(crate) fd: SharedFd<S>,
762    pub(crate) buffer: T,
763    pub(crate) addr: SockAddr,
764    _p: PhantomPinned,
765}
766
767impl<T: IoBuf, S> SendTo<T, S> {
768    /// Create [`SendTo`].
769    pub fn new(fd: SharedFd<S>, buffer: T, addr: SockAddr) -> Self {
770        Self {
771            fd,
772            buffer,
773            addr,
774            _p: PhantomPinned,
775        }
776    }
777}
778
779impl<T: IoBuf, S: AsRawFd> SendTo<T, S> {
780    unsafe fn call(&self) -> libc::ssize_t {
781        let slice = self.buffer.as_slice();
782        libc::sendto(
783            self.fd.as_raw_fd(),
784            slice.as_ptr() as _,
785            slice.len(),
786            0,
787            self.addr.as_ptr(),
788            self.addr.len(),
789        )
790    }
791}
792
793impl<T: IoBuf, S: AsRawFd> OpCode for SendTo<T, S> {
794    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
795        syscall!(self.call(), wait_writable(self.fd.as_raw_fd()))
796    }
797
798    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
799        Some(OpType::Fd(self.fd.as_raw_fd()))
800    }
801
802    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
803        syscall!(break self.call())
804    }
805}
806
807impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
808    type Inner = T;
809
810    fn into_inner(self) -> Self::Inner {
811        self.buffer
812    }
813}
814
815/// Send data to specified address from vectored buffer.
816pub struct SendToVectored<T: IoVectoredBuf, S> {
817    pub(crate) fd: SharedFd<S>,
818    pub(crate) buffer: T,
819    pub(crate) addr: SockAddr,
820    pub(crate) slices: Vec<IoSlice>,
821    pub(crate) msg: libc::msghdr,
822    _p: PhantomPinned,
823}
824
825impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
826    /// Create [`SendToVectored`].
827    pub fn new(fd: SharedFd<S>, buffer: T, addr: SockAddr) -> Self {
828        Self {
829            fd,
830            buffer,
831            addr,
832            slices: vec![],
833            msg: unsafe { std::mem::zeroed() },
834            _p: PhantomPinned,
835        }
836    }
837}
838
839impl<T: IoVectoredBuf, S: AsRawFd> SendToVectored<T, S> {
840    fn set_msg(&mut self) {
841        self.slices = unsafe { self.buffer.io_slices() };
842        self.msg = libc::msghdr {
843            msg_name: &mut self.addr as *mut _ as _,
844            msg_namelen: std::mem::size_of_val(&self.addr) as _,
845            msg_iov: self.slices.as_mut_ptr() as _,
846            msg_iovlen: self.slices.len() as _,
847            msg_control: std::ptr::null_mut(),
848            msg_controllen: 0,
849            msg_flags: 0,
850        };
851    }
852
853    unsafe fn call(&self) -> libc::ssize_t {
854        libc::sendmsg(self.fd.as_raw_fd(), &self.msg, 0)
855    }
856}
857
858impl<T: IoVectoredBuf, S: AsRawFd> OpCode for SendToVectored<T, S> {
859    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
860        let this = unsafe { self.get_unchecked_mut() };
861        this.set_msg();
862        syscall!(this.call(), wait_writable(this.fd.as_raw_fd()))
863    }
864
865    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
866        Some(OpType::Fd(self.fd.as_raw_fd()))
867    }
868
869    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
870        syscall!(break self.call())
871    }
872}
873
874impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
875    type Inner = T;
876
877    fn into_inner(self) -> Self::Inner {
878        self.buffer
879    }
880}
881
882impl<T: IoVectoredBufMut, C: IoBufMut, S: AsRawFd> RecvMsg<T, C, S> {
883    unsafe fn call(&mut self) -> libc::ssize_t {
884        libc::recvmsg(self.fd.as_raw_fd(), &mut self.msg, 0)
885    }
886}
887
888impl<T: IoVectoredBufMut, C: IoBufMut, S: AsRawFd> OpCode for RecvMsg<T, C, S> {
889    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
890        let this = unsafe { self.get_unchecked_mut() };
891        unsafe { this.set_msg() };
892        syscall!(this.call(), wait_readable(this.fd.as_raw_fd()))
893    }
894
895    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
896        Some(OpType::Fd(self.fd.as_raw_fd()))
897    }
898
899    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
900        let this = unsafe { self.get_unchecked_mut() };
901        syscall!(break this.call())
902    }
903}
904
905impl<T: IoVectoredBuf, C: IoBuf, S: AsRawFd> SendMsg<T, C, S> {
906    unsafe fn call(&self) -> libc::ssize_t {
907        libc::sendmsg(self.fd.as_raw_fd(), &self.msg, 0)
908    }
909}
910
911impl<T: IoVectoredBuf, C: IoBuf, S: AsRawFd> OpCode for SendMsg<T, C, S> {
912    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
913        let this = unsafe { self.get_unchecked_mut() };
914        unsafe { this.set_msg() };
915        syscall!(this.call(), wait_writable(this.fd.as_raw_fd()))
916    }
917
918    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
919        Some(OpType::Fd(self.fd.as_raw_fd()))
920    }
921
922    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
923        syscall!(break self.call())
924    }
925}
926
927impl<S: AsRawFd> OpCode for PollOnce<S> {
928    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
929        Ok(Decision::wait_for(self.fd.as_raw_fd(), self.interest))
930    }
931
932    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
933        Some(OpType::Fd(self.fd.as_raw_fd()))
934    }
935
936    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
937        Poll::Ready(Ok(0))
938    }
939}