compio_driver/iocp/
op.rs

1#[cfg(feature = "once_cell_try")]
2use std::sync::OnceLock;
3use std::{
4    io,
5    marker::PhantomPinned,
6    net::Shutdown,
7    os::windows::io::AsRawSocket,
8    pin::Pin,
9    ptr::{null, null_mut},
10    task::Poll,
11};
12
13use aligned_array::{A8, Aligned};
14use compio_buf::{
15    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
16};
17#[cfg(not(feature = "once_cell_try"))]
18use once_cell::sync::OnceCell as OnceLock;
19use socket2::SockAddr;
20use windows_sys::{
21    Win32::{
22        Foundation::{
23            CloseHandle, ERROR_BROKEN_PIPE, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE,
24            ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_NOT_FOUND, ERROR_PIPE_CONNECTED,
25            ERROR_PIPE_NOT_CONNECTED, GetLastError,
26        },
27        Networking::WinSock::{
28            CMSGHDR, LPFN_ACCEPTEX, LPFN_CONNECTEX, LPFN_GETACCEPTEXSOCKADDRS, LPFN_WSARECVMSG,
29            SD_BOTH, SD_RECEIVE, SD_SEND, SIO_GET_EXTENSION_FUNCTION_POINTER,
30            SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, SOCKADDR, SOCKADDR_STORAGE,
31            SOL_SOCKET, WSABUF, WSAID_ACCEPTEX, WSAID_CONNECTEX, WSAID_GETACCEPTEXSOCKADDRS,
32            WSAID_WSARECVMSG, WSAIoctl, WSAMSG, WSARecv, WSARecvFrom, WSASend, WSASendMsg,
33            WSASendTo, closesocket, setsockopt, shutdown, socklen_t,
34        },
35        Storage::FileSystem::{FlushFileBuffers, ReadFile, WriteFile},
36        System::{
37            IO::{CancelIoEx, OVERLAPPED},
38            Pipes::ConnectNamedPipe,
39        },
40    },
41    core::GUID,
42};
43
44use crate::{AsRawFd, OpCode, OpType, RawFd, SharedFd, op::*, syscall};
45
46#[inline]
47fn winapi_result(transferred: u32) -> Poll<io::Result<usize>> {
48    let error = unsafe { GetLastError() };
49    assert_ne!(error, 0);
50    match error {
51        ERROR_IO_PENDING => Poll::Pending,
52        ERROR_IO_INCOMPLETE
53        | ERROR_HANDLE_EOF
54        | ERROR_BROKEN_PIPE
55        | ERROR_PIPE_CONNECTED
56        | ERROR_PIPE_NOT_CONNECTED
57        | ERROR_NO_DATA => Poll::Ready(Ok(transferred as _)),
58        _ => Poll::Ready(Err(io::Error::from_raw_os_error(error as _))),
59    }
60}
61
62#[inline]
63fn win32_result(res: i32, transferred: u32) -> Poll<io::Result<usize>> {
64    if res == 0 {
65        winapi_result(transferred)
66    } else {
67        Poll::Ready(Ok(transferred as _))
68    }
69}
70
71#[inline]
72fn winsock_result(res: i32, transferred: u32) -> Poll<io::Result<usize>> {
73    if res != 0 {
74        winapi_result(transferred)
75    } else {
76        Poll::Ready(Ok(transferred as _))
77    }
78}
79
80#[inline]
81fn cancel(handle: RawFd, optr: *mut OVERLAPPED) -> io::Result<()> {
82    match syscall!(BOOL, CancelIoEx(handle as _, optr)) {
83        Ok(_) => Ok(()),
84        Err(e) => {
85            if e.raw_os_error() == Some(ERROR_NOT_FOUND as _) {
86                Ok(())
87            } else {
88                Err(e)
89            }
90        }
91    }
92}
93
94fn get_wsa_fn<F>(handle: RawFd, fguid: GUID) -> io::Result<Option<F>> {
95    let mut fptr = None;
96    let mut returned = 0;
97    syscall!(
98        SOCKET,
99        WSAIoctl(
100            handle as _,
101            SIO_GET_EXTENSION_FUNCTION_POINTER,
102            std::ptr::addr_of!(fguid).cast(),
103            std::mem::size_of_val(&fguid) as _,
104            std::ptr::addr_of_mut!(fptr).cast(),
105            std::mem::size_of::<F>() as _,
106            &mut returned,
107            null_mut(),
108            None,
109        )
110    )?;
111    Ok(fptr)
112}
113
114impl<
115    D: std::marker::Send + 'static,
116    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
117> OpCode for Asyncify<F, D>
118{
119    fn op_type(&self) -> OpType {
120        OpType::Blocking
121    }
122
123    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
124        // Safety: self won't be moved
125        let this = self.get_unchecked_mut();
126        let f = this
127            .f
128            .take()
129            .expect("the operate method could only be called once");
130        let BufResult(res, data) = f();
131        this.data = Some(data);
132        Poll::Ready(res)
133    }
134}
135
136impl OpCode for CloseFile {
137    fn op_type(&self) -> OpType {
138        OpType::Blocking
139    }
140
141    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
142        Poll::Ready(Ok(syscall!(BOOL, CloseHandle(self.fd.as_raw_fd()))? as _))
143    }
144}
145
146impl<T: IoBufMut, S: AsRawFd> OpCode for ReadAt<T, S> {
147    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
148        if let Some(overlapped) = optr.as_mut() {
149            overlapped.Anonymous.Anonymous.Offset = (self.offset & 0xFFFFFFFF) as _;
150            overlapped.Anonymous.Anonymous.OffsetHigh = (self.offset >> 32) as _;
151        }
152        let fd = self.fd.as_raw_fd();
153        let slice = self.get_unchecked_mut().buffer.as_mut_slice();
154        let mut transferred = 0;
155        let res = ReadFile(
156            fd,
157            slice.as_mut_ptr() as _,
158            slice.len() as _,
159            &mut transferred,
160            optr,
161        );
162        win32_result(res, transferred)
163    }
164
165    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
166        cancel(self.fd.as_raw_fd(), optr)
167    }
168}
169
170impl<T: IoBuf, S: AsRawFd> OpCode for WriteAt<T, S> {
171    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
172        if let Some(overlapped) = optr.as_mut() {
173            overlapped.Anonymous.Anonymous.Offset = (self.offset & 0xFFFFFFFF) as _;
174            overlapped.Anonymous.Anonymous.OffsetHigh = (self.offset >> 32) as _;
175        }
176        let slice = self.buffer.as_slice();
177        let mut transferred = 0;
178        let res = WriteFile(
179            self.fd.as_raw_fd(),
180            slice.as_ptr() as _,
181            slice.len() as _,
182            &mut transferred,
183            optr,
184        );
185        win32_result(res, transferred)
186    }
187
188    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
189        cancel(self.fd.as_raw_fd(), optr)
190    }
191}
192
193impl<S: AsRawFd> OpCode for Sync<S> {
194    fn op_type(&self) -> OpType {
195        OpType::Blocking
196    }
197
198    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
199        Poll::Ready(Ok(
200            syscall!(BOOL, FlushFileBuffers(self.fd.as_raw_fd()))? as _
201        ))
202    }
203}
204
205impl<S: AsRawFd> OpCode for ShutdownSocket<S> {
206    fn op_type(&self) -> OpType {
207        OpType::Blocking
208    }
209
210    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
211        let how = match self.how {
212            Shutdown::Write => SD_SEND,
213            Shutdown::Read => SD_RECEIVE,
214            Shutdown::Both => SD_BOTH,
215        };
216        Poll::Ready(Ok(
217            syscall!(SOCKET, shutdown(self.fd.as_raw_fd() as _, how))? as _,
218        ))
219    }
220}
221
222impl OpCode for CloseSocket {
223    fn op_type(&self) -> OpType {
224        OpType::Blocking
225    }
226
227    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
228        Poll::Ready(Ok(
229            syscall!(SOCKET, closesocket(self.fd.as_raw_fd() as _))? as _
230        ))
231    }
232}
233
234static ACCEPT_EX: OnceLock<LPFN_ACCEPTEX> = OnceLock::new();
235static GET_ADDRS: OnceLock<LPFN_GETACCEPTEXSOCKADDRS> = OnceLock::new();
236
237const ACCEPT_ADDR_BUFFER_SIZE: usize = std::mem::size_of::<SOCKADDR_STORAGE>() + 16;
238const ACCEPT_BUFFER_SIZE: usize = ACCEPT_ADDR_BUFFER_SIZE * 2;
239
240/// Accept a connection.
241pub struct Accept<S> {
242    pub(crate) fd: SharedFd<S>,
243    pub(crate) accept_fd: socket2::Socket,
244    pub(crate) buffer: Aligned<A8, [u8; ACCEPT_BUFFER_SIZE]>,
245    _p: PhantomPinned,
246}
247
248impl<S> Accept<S> {
249    /// Create [`Accept`]. `accept_fd` should not be bound.
250    pub fn new(fd: SharedFd<S>, accept_fd: socket2::Socket) -> Self {
251        Self {
252            fd,
253            accept_fd,
254            buffer: unsafe { std::mem::zeroed() },
255            _p: PhantomPinned,
256        }
257    }
258}
259
260impl<S: AsRawFd> Accept<S> {
261    /// Update accept context.
262    pub fn update_context(&self) -> io::Result<()> {
263        let fd = self.fd.as_raw_fd();
264        syscall!(
265            SOCKET,
266            setsockopt(
267                self.accept_fd.as_raw_socket() as _,
268                SOL_SOCKET,
269                SO_UPDATE_ACCEPT_CONTEXT,
270                &fd as *const _ as _,
271                std::mem::size_of_val(&fd) as _,
272            )
273        )?;
274        Ok(())
275    }
276
277    /// Get the remote address from the inner buffer.
278    pub fn into_addr(self) -> io::Result<(socket2::Socket, SockAddr)> {
279        let get_addrs_fn = GET_ADDRS
280            .get_or_try_init(|| get_wsa_fn(self.fd.as_raw_fd(), WSAID_GETACCEPTEXSOCKADDRS))?
281            .ok_or_else(|| {
282                io::Error::new(
283                    io::ErrorKind::Unsupported,
284                    "cannot retrieve GetAcceptExSockAddrs",
285                )
286            })?;
287        let mut local_addr: *mut SOCKADDR = null_mut();
288        let mut local_addr_len = 0;
289        let mut remote_addr: *mut SOCKADDR = null_mut();
290        let mut remote_addr_len = 0;
291        unsafe {
292            get_addrs_fn(
293                &self.buffer as *const _ as *const _,
294                0,
295                ACCEPT_ADDR_BUFFER_SIZE as _,
296                ACCEPT_ADDR_BUFFER_SIZE as _,
297                &mut local_addr,
298                &mut local_addr_len,
299                &mut remote_addr,
300                &mut remote_addr_len,
301            );
302        }
303        Ok((self.accept_fd, unsafe {
304            SockAddr::new(*remote_addr.cast::<SOCKADDR_STORAGE>(), remote_addr_len)
305        }))
306    }
307}
308
309impl<S: AsRawFd> OpCode for Accept<S> {
310    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
311        let accept_fn = ACCEPT_EX
312            .get_or_try_init(|| get_wsa_fn(self.fd.as_raw_fd(), WSAID_ACCEPTEX))?
313            .ok_or_else(|| {
314                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve AcceptEx")
315            })?;
316        let mut received = 0;
317        let res = accept_fn(
318            self.fd.as_raw_fd() as _,
319            self.accept_fd.as_raw_socket() as _,
320            self.get_unchecked_mut().buffer.as_mut_ptr() as _,
321            0,
322            ACCEPT_ADDR_BUFFER_SIZE as _,
323            ACCEPT_ADDR_BUFFER_SIZE as _,
324            &mut received,
325            optr,
326        );
327        win32_result(res, received)
328    }
329
330    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
331        cancel(self.fd.as_raw_fd(), optr)
332    }
333}
334
335static CONNECT_EX: OnceLock<LPFN_CONNECTEX> = OnceLock::new();
336
337impl<S: AsRawFd> Connect<S> {
338    /// Update connect context.
339    pub fn update_context(&self) -> io::Result<()> {
340        syscall!(
341            SOCKET,
342            setsockopt(
343                self.fd.as_raw_fd() as _,
344                SOL_SOCKET,
345                SO_UPDATE_CONNECT_CONTEXT,
346                null(),
347                0,
348            )
349        )?;
350        Ok(())
351    }
352}
353
354impl<S: AsRawFd> OpCode for Connect<S> {
355    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
356        let connect_fn = CONNECT_EX
357            .get_or_try_init(|| get_wsa_fn(self.fd.as_raw_fd(), WSAID_CONNECTEX))?
358            .ok_or_else(|| {
359                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve ConnectEx")
360            })?;
361        let mut sent = 0;
362        let res = connect_fn(
363            self.fd.as_raw_fd() as _,
364            self.addr.as_ptr(),
365            self.addr.len(),
366            null(),
367            0,
368            &mut sent,
369            optr,
370        );
371        win32_result(res, sent)
372    }
373
374    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
375        cancel(self.fd.as_raw_fd(), optr)
376    }
377}
378
379/// Receive data from remote.
380pub struct Recv<T: IoBufMut, S> {
381    pub(crate) fd: SharedFd<S>,
382    pub(crate) buffer: T,
383    _p: PhantomPinned,
384}
385
386impl<T: IoBufMut, S> Recv<T, S> {
387    /// Create [`Recv`].
388    pub fn new(fd: SharedFd<S>, buffer: T) -> Self {
389        Self {
390            fd,
391            buffer,
392            _p: PhantomPinned,
393        }
394    }
395}
396
397impl<T: IoBufMut, S> IntoInner for Recv<T, S> {
398    type Inner = T;
399
400    fn into_inner(self) -> Self::Inner {
401        self.buffer
402    }
403}
404
405impl<T: IoBufMut, S: AsRawFd> OpCode for Recv<T, S> {
406    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
407        let fd = self.fd.as_raw_fd();
408        let slice = self.get_unchecked_mut().buffer.as_mut_slice();
409        let mut transferred = 0;
410        let res = ReadFile(
411            fd,
412            slice.as_mut_ptr() as _,
413            slice.len() as _,
414            &mut transferred,
415            optr,
416        );
417        win32_result(res, transferred)
418    }
419
420    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
421        cancel(self.fd.as_raw_fd(), optr)
422    }
423}
424
425/// Receive data from remote into vectored buffer.
426pub struct RecvVectored<T: IoVectoredBufMut, S> {
427    pub(crate) fd: SharedFd<S>,
428    pub(crate) buffer: T,
429    _p: PhantomPinned,
430}
431
432impl<T: IoVectoredBufMut, S> RecvVectored<T, S> {
433    /// Create [`RecvVectored`].
434    pub fn new(fd: SharedFd<S>, buffer: T) -> Self {
435        Self {
436            fd,
437            buffer,
438            _p: PhantomPinned,
439        }
440    }
441}
442
443impl<T: IoVectoredBufMut, S> IntoInner for RecvVectored<T, S> {
444    type Inner = T;
445
446    fn into_inner(self) -> Self::Inner {
447        self.buffer
448    }
449}
450
451impl<T: IoVectoredBufMut, S: AsRawFd> OpCode for RecvVectored<T, S> {
452    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
453        let fd = self.fd.as_raw_fd();
454        let slices = self.get_unchecked_mut().buffer.io_slices_mut();
455        let mut flags = 0;
456        let mut received = 0;
457        let res = WSARecv(
458            fd as _,
459            slices.as_ptr() as _,
460            slices.len() as _,
461            &mut received,
462            &mut flags,
463            optr,
464            None,
465        );
466        winsock_result(res, received)
467    }
468
469    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
470        cancel(self.fd.as_raw_fd(), optr)
471    }
472}
473
474/// Send data to remote.
475pub struct Send<T: IoBuf, S> {
476    pub(crate) fd: SharedFd<S>,
477    pub(crate) buffer: T,
478    _p: PhantomPinned,
479}
480
481impl<T: IoBuf, S> Send<T, S> {
482    /// Create [`Send`].
483    pub fn new(fd: SharedFd<S>, buffer: T) -> Self {
484        Self {
485            fd,
486            buffer,
487            _p: PhantomPinned,
488        }
489    }
490}
491
492impl<T: IoBuf, S> IntoInner for Send<T, S> {
493    type Inner = T;
494
495    fn into_inner(self) -> Self::Inner {
496        self.buffer
497    }
498}
499
500impl<T: IoBuf, S: AsRawFd> OpCode for Send<T, S> {
501    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
502        let slice = self.buffer.as_slice();
503        let mut transferred = 0;
504        let res = WriteFile(
505            self.fd.as_raw_fd(),
506            slice.as_ptr() as _,
507            slice.len() as _,
508            &mut transferred,
509            optr,
510        );
511        win32_result(res, transferred)
512    }
513
514    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
515        cancel(self.fd.as_raw_fd(), optr)
516    }
517}
518
519/// Send data to remote from vectored buffer.
520pub struct SendVectored<T: IoVectoredBuf, S> {
521    pub(crate) fd: SharedFd<S>,
522    pub(crate) buffer: T,
523    _p: PhantomPinned,
524}
525
526impl<T: IoVectoredBuf, S> SendVectored<T, S> {
527    /// Create [`SendVectored`].
528    pub fn new(fd: SharedFd<S>, buffer: T) -> Self {
529        Self {
530            fd,
531            buffer,
532            _p: PhantomPinned,
533        }
534    }
535}
536
537impl<T: IoVectoredBuf, S> IntoInner for SendVectored<T, S> {
538    type Inner = T;
539
540    fn into_inner(self) -> Self::Inner {
541        self.buffer
542    }
543}
544
545impl<T: IoVectoredBuf, S: AsRawFd> OpCode for SendVectored<T, S> {
546    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
547        let slices = self.buffer.io_slices();
548        let mut sent = 0;
549        let res = WSASend(
550            self.fd.as_raw_fd() as _,
551            slices.as_ptr() as _,
552            slices.len() as _,
553            &mut sent,
554            0,
555            optr,
556            None,
557        );
558        winsock_result(res, sent)
559    }
560
561    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
562        cancel(self.fd.as_raw_fd(), optr)
563    }
564}
565
566/// Receive data and source address.
567pub struct RecvFrom<T: IoBufMut, S> {
568    pub(crate) fd: SharedFd<S>,
569    pub(crate) buffer: T,
570    pub(crate) addr: SOCKADDR_STORAGE,
571    pub(crate) addr_len: socklen_t,
572    _p: PhantomPinned,
573}
574
575impl<T: IoBufMut, S> RecvFrom<T, S> {
576    /// Create [`RecvFrom`].
577    pub fn new(fd: SharedFd<S>, buffer: T) -> Self {
578        Self {
579            fd,
580            buffer,
581            addr: unsafe { std::mem::zeroed() },
582            addr_len: std::mem::size_of::<SOCKADDR_STORAGE>() as _,
583            _p: PhantomPinned,
584        }
585    }
586}
587
588impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
589    type Inner = (T, SOCKADDR_STORAGE, socklen_t);
590
591    fn into_inner(self) -> Self::Inner {
592        (self.buffer, self.addr, self.addr_len)
593    }
594}
595
596impl<T: IoBufMut, S: AsRawFd> OpCode for RecvFrom<T, S> {
597    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
598        let this = self.get_unchecked_mut();
599        let fd = this.fd.as_raw_fd();
600        let buffer = this.buffer.as_io_slice_mut();
601        let mut flags = 0;
602        let mut received = 0;
603        let res = WSARecvFrom(
604            fd as _,
605            &buffer as *const _ as _,
606            1,
607            &mut received,
608            &mut flags,
609            &mut this.addr as *mut _ as *mut SOCKADDR,
610            &mut this.addr_len,
611            optr,
612            None,
613        );
614        winsock_result(res, received)
615    }
616
617    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
618        cancel(self.fd.as_raw_fd(), optr)
619    }
620}
621
622/// Receive data and source address into vectored buffer.
623pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
624    pub(crate) fd: SharedFd<S>,
625    pub(crate) buffer: T,
626    pub(crate) addr: SOCKADDR_STORAGE,
627    pub(crate) addr_len: socklen_t,
628    _p: PhantomPinned,
629}
630
631impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
632    /// Create [`RecvFromVectored`].
633    pub fn new(fd: SharedFd<S>, buffer: T) -> Self {
634        Self {
635            fd,
636            buffer,
637            addr: unsafe { std::mem::zeroed() },
638            addr_len: std::mem::size_of::<SOCKADDR_STORAGE>() as _,
639            _p: PhantomPinned,
640        }
641    }
642}
643
644impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
645    type Inner = (T, SOCKADDR_STORAGE, socklen_t);
646
647    fn into_inner(self) -> Self::Inner {
648        (self.buffer, self.addr, self.addr_len)
649    }
650}
651
652impl<T: IoVectoredBufMut, S: AsRawFd> OpCode for RecvFromVectored<T, S> {
653    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
654        let this = self.get_unchecked_mut();
655        let fd = this.fd.as_raw_fd();
656        let buffer = this.buffer.io_slices_mut();
657        let mut flags = 0;
658        let mut received = 0;
659        let res = WSARecvFrom(
660            fd as _,
661            buffer.as_ptr() as _,
662            buffer.len() as _,
663            &mut received,
664            &mut flags,
665            &mut this.addr as *mut _ as *mut SOCKADDR,
666            &mut this.addr_len,
667            optr,
668            None,
669        );
670        winsock_result(res, received)
671    }
672
673    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
674        cancel(self.fd.as_raw_fd(), optr)
675    }
676}
677
678/// Send data to specified address.
679pub struct SendTo<T: IoBuf, S> {
680    pub(crate) fd: SharedFd<S>,
681    pub(crate) buffer: T,
682    pub(crate) addr: SockAddr,
683    _p: PhantomPinned,
684}
685
686impl<T: IoBuf, S> SendTo<T, S> {
687    /// Create [`SendTo`].
688    pub fn new(fd: SharedFd<S>, buffer: T, addr: SockAddr) -> Self {
689        Self {
690            fd,
691            buffer,
692            addr,
693            _p: PhantomPinned,
694        }
695    }
696}
697
698impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
699    type Inner = T;
700
701    fn into_inner(self) -> Self::Inner {
702        self.buffer
703    }
704}
705
706impl<T: IoBuf, S: AsRawFd> OpCode for SendTo<T, S> {
707    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
708        let buffer = self.buffer.as_io_slice();
709        let mut sent = 0;
710        let res = WSASendTo(
711            self.fd.as_raw_fd() as _,
712            &buffer as *const _ as _,
713            1,
714            &mut sent,
715            0,
716            self.addr.as_ptr(),
717            self.addr.len(),
718            optr,
719            None,
720        );
721        winsock_result(res, sent)
722    }
723
724    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
725        cancel(self.fd.as_raw_fd(), optr)
726    }
727}
728
729/// Send data to specified address from vectored buffer.
730pub struct SendToVectored<T: IoVectoredBuf, S> {
731    pub(crate) fd: SharedFd<S>,
732    pub(crate) buffer: T,
733    pub(crate) addr: SockAddr,
734    _p: PhantomPinned,
735}
736
737impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
738    /// Create [`SendToVectored`].
739    pub fn new(fd: SharedFd<S>, buffer: T, addr: SockAddr) -> Self {
740        Self {
741            fd,
742            buffer,
743            addr,
744            _p: PhantomPinned,
745        }
746    }
747}
748
749impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
750    type Inner = T;
751
752    fn into_inner(self) -> Self::Inner {
753        self.buffer
754    }
755}
756
757impl<T: IoVectoredBuf, S: AsRawFd> OpCode for SendToVectored<T, S> {
758    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
759        let buffer = self.buffer.io_slices();
760        let mut sent = 0;
761        let res = WSASendTo(
762            self.fd.as_raw_fd() as _,
763            buffer.as_ptr() as _,
764            buffer.len() as _,
765            &mut sent,
766            0,
767            self.addr.as_ptr(),
768            self.addr.len(),
769            optr,
770            None,
771        );
772        winsock_result(res, sent)
773    }
774
775    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
776        cancel(self.fd.as_raw_fd(), optr)
777    }
778}
779
780static WSA_RECVMSG: OnceLock<LPFN_WSARECVMSG> = OnceLock::new();
781
782/// Receive data and source address with ancillary data into vectored buffer.
783pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
784    msg: WSAMSG,
785    addr: SOCKADDR_STORAGE,
786    fd: SharedFd<S>,
787    buffer: T,
788    control: C,
789    _p: PhantomPinned,
790}
791
792impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
793    /// Create [`RecvMsg`].
794    ///
795    /// # Panics
796    ///
797    /// This function will panic if the control message buffer is misaligned.
798    pub fn new(fd: SharedFd<S>, buffer: T, control: C) -> Self {
799        assert!(
800            control.as_buf_ptr().cast::<CMSGHDR>().is_aligned(),
801            "misaligned control message buffer"
802        );
803        Self {
804            msg: unsafe { std::mem::zeroed() },
805            addr: unsafe { std::mem::zeroed() },
806            fd,
807            buffer,
808            control,
809            _p: PhantomPinned,
810        }
811    }
812}
813
814impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
815    type Inner = ((T, C), SOCKADDR_STORAGE, socklen_t, usize);
816
817    fn into_inner(self) -> Self::Inner {
818        (
819            (self.buffer, self.control),
820            self.addr,
821            self.msg.namelen,
822            self.msg.Control.len as _,
823        )
824    }
825}
826
827impl<T: IoVectoredBufMut, C: IoBufMut, S: AsRawFd> OpCode for RecvMsg<T, C, S> {
828    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
829        let recvmsg_fn = WSA_RECVMSG
830            .get_or_try_init(|| get_wsa_fn(self.fd.as_raw_fd(), WSAID_WSARECVMSG))?
831            .ok_or_else(|| {
832                io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve WSARecvMsg")
833            })?;
834
835        let this = self.get_unchecked_mut();
836
837        let mut slices = this.buffer.io_slices_mut();
838        this.msg.name = &mut this.addr as *mut _ as _;
839        this.msg.namelen = std::mem::size_of::<SOCKADDR_STORAGE>() as _;
840        this.msg.lpBuffers = slices.as_mut_ptr() as _;
841        this.msg.dwBufferCount = slices.len() as _;
842        this.msg.Control =
843            std::mem::transmute::<IoSliceMut, WSABUF>(this.control.as_io_slice_mut());
844
845        let mut received = 0;
846        let res = recvmsg_fn(
847            this.fd.as_raw_fd() as _,
848            &mut this.msg,
849            &mut received,
850            optr,
851            None,
852        );
853        winsock_result(res, received)
854    }
855
856    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
857        cancel(self.fd.as_raw_fd(), optr)
858    }
859}
860
861/// Send data to specified address accompanied by ancillary data from vectored
862/// buffer.
863pub struct SendMsg<T: IoVectoredBuf, C: IoBuf, S> {
864    fd: SharedFd<S>,
865    buffer: T,
866    control: C,
867    addr: SockAddr,
868    _p: PhantomPinned,
869}
870
871impl<T: IoVectoredBuf, C: IoBuf, S> SendMsg<T, C, S> {
872    /// Create [`SendMsg`].
873    ///
874    /// # Panics
875    ///
876    /// This function will panic if the control message buffer is misaligned.
877    pub fn new(fd: SharedFd<S>, buffer: T, control: C, addr: SockAddr) -> Self {
878        assert!(
879            control.as_buf_ptr().cast::<CMSGHDR>().is_aligned(),
880            "misaligned control message buffer"
881        );
882        Self {
883            fd,
884            buffer,
885            control,
886            addr,
887            _p: PhantomPinned,
888        }
889    }
890}
891
892impl<T: IoVectoredBuf, C: IoBuf, S> IntoInner for SendMsg<T, C, S> {
893    type Inner = (T, C);
894
895    fn into_inner(self) -> Self::Inner {
896        (self.buffer, self.control)
897    }
898}
899
900impl<T: IoVectoredBuf, C: IoBuf, S: AsRawFd> OpCode for SendMsg<T, C, S> {
901    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
902        let this = self.get_unchecked_mut();
903
904        let slices = this.buffer.io_slices();
905        let msg = WSAMSG {
906            name: this.addr.as_ptr() as _,
907            namelen: this.addr.len(),
908            lpBuffers: slices.as_ptr() as _,
909            dwBufferCount: slices.len() as _,
910            Control: std::mem::transmute::<IoSlice, WSABUF>(this.control.as_io_slice()),
911            dwFlags: 0,
912        };
913
914        let mut sent = 0;
915        let res = WSASendMsg(this.fd.as_raw_fd() as _, &msg, 0, &mut sent, optr, None);
916        winsock_result(res, sent)
917    }
918
919    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
920        cancel(self.fd.as_raw_fd(), optr)
921    }
922}
923
924/// Connect a named pipe server.
925pub struct ConnectNamedPipe<S> {
926    pub(crate) fd: SharedFd<S>,
927}
928
929impl<S> ConnectNamedPipe<S> {
930    /// Create [`ConnectNamedPipe`](struct@ConnectNamedPipe).
931    pub fn new(fd: SharedFd<S>) -> Self {
932        Self { fd }
933    }
934}
935
936impl<S: AsRawFd> OpCode for ConnectNamedPipe<S> {
937    unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
938        let res = ConnectNamedPipe(self.fd.as_raw_fd() as _, optr);
939        win32_result(res, 0)
940    }
941
942    unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
943        cancel(self.fd.as_raw_fd(), optr)
944    }
945}