compio_driver/
lib.rs

1//! The platform-specified driver.
2//! Some types differ by compilation target.
3
4#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
5#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
6#![warn(missing_docs)]
7
8#[cfg(all(
9    target_os = "linux",
10    not(feature = "io-uring"),
11    not(feature = "polling")
12))]
13compile_error!("You must choose at least one of these features: [\"io-uring\", \"polling\"]");
14
15use std::{
16    io,
17    task::{Poll, Waker},
18    time::Duration,
19};
20
21use compio_buf::BufResult;
22use compio_log::instrument;
23
24mod key;
25pub use key::Key;
26
27pub mod op;
28#[cfg(unix)]
29#[cfg_attr(docsrs, doc(cfg(all())))]
30mod unix;
31#[cfg(unix)]
32use unix::Overlapped;
33
34mod asyncify;
35pub use asyncify::*;
36
37mod fd;
38pub use fd::*;
39
40mod driver_type;
41pub use driver_type::*;
42
43cfg_if::cfg_if! {
44    if #[cfg(windows)] {
45        #[path = "iocp/mod.rs"]
46        mod sys;
47    } else if #[cfg(all(target_os = "linux", feature = "polling", feature = "io-uring"))] {
48        #[path = "fusion/mod.rs"]
49        mod sys;
50    } else if #[cfg(all(target_os = "linux", feature = "io-uring"))] {
51        #[path = "iour/mod.rs"]
52        mod sys;
53    } else if #[cfg(unix)] {
54        #[path = "poll/mod.rs"]
55        mod sys;
56    }
57}
58
59pub use sys::*;
60
61#[cfg(windows)]
62#[macro_export]
63#[doc(hidden)]
64macro_rules! syscall {
65    (BOOL, $e:expr) => {
66        $crate::syscall!($e, == 0)
67    };
68    (SOCKET, $e:expr) => {
69        $crate::syscall!($e, != 0)
70    };
71    (HANDLE, $e:expr) => {
72        $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
73    };
74    ($e:expr, $op: tt $rhs: expr) => {{
75        #[allow(unused_unsafe)]
76        let res = unsafe { $e };
77        if res $op $rhs {
78            Err(::std::io::Error::last_os_error())
79        } else {
80            Ok(res)
81        }
82    }};
83}
84
85/// Helper macro to execute a system call
86#[cfg(unix)]
87#[macro_export]
88#[doc(hidden)]
89macro_rules! syscall {
90    (break $e:expr) => {
91        loop {
92            match $crate::syscall!($e) {
93                Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)),
94                Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
95                    => break ::std::task::Poll::Pending,
96                Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {},
97                Err(e) => break ::std::task::Poll::Ready(Err(e)),
98            }
99        }
100    };
101    ($e:expr, $f:ident($fd:expr)) => {
102        match $crate::syscall!(break $e) {
103            ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
104            ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
105            ::std::task::Poll::Ready(Err(e)) => Err(e),
106        }
107    };
108    ($e:expr) => {{
109        #[allow(unused_unsafe)]
110        let res = unsafe { $e };
111        if res == -1 {
112            Err(::std::io::Error::last_os_error())
113        } else {
114            Ok(res)
115        }
116    }};
117}
118
119#[macro_export]
120#[doc(hidden)]
121macro_rules! impl_raw_fd {
122    ($t:ty, $it:ty, $inner:ident) => {
123        impl $crate::AsRawFd for $t {
124            fn as_raw_fd(&self) -> $crate::RawFd {
125                self.$inner.as_raw_fd()
126            }
127        }
128        #[cfg(unix)]
129        impl std::os::fd::FromRawFd for $t {
130            unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
131                Self {
132                    $inner: std::os::fd::FromRawFd::from_raw_fd(fd),
133                }
134            }
135        }
136        impl $crate::ToSharedFd<$it> for $t {
137            fn to_shared_fd(&self) -> $crate::SharedFd<$it> {
138                self.$inner.to_shared_fd()
139            }
140        }
141    };
142    ($t:ty, $it:ty, $inner:ident,file) => {
143        $crate::impl_raw_fd!($t, $it, $inner);
144        #[cfg(windows)]
145        impl std::os::windows::io::FromRawHandle for $t {
146            unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
147                Self {
148                    $inner: std::os::windows::io::FromRawHandle::from_raw_handle(handle),
149                }
150            }
151        }
152        #[cfg(windows)]
153        impl std::os::windows::io::AsRawHandle for $t {
154            fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
155                self.$inner.as_raw_handle()
156            }
157        }
158    };
159    ($t:ty, $it:ty, $inner:ident,socket) => {
160        $crate::impl_raw_fd!($t, $it, $inner);
161        #[cfg(windows)]
162        impl std::os::windows::io::FromRawSocket for $t {
163            unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self {
164                Self {
165                    $inner: std::os::windows::io::FromRawSocket::from_raw_socket(sock),
166                }
167            }
168        }
169        #[cfg(windows)]
170        impl std::os::windows::io::AsRawSocket for $t {
171            fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
172                self.$inner.as_raw_socket()
173            }
174        }
175    };
176}
177
178/// The return type of [`Proactor::push`].
179pub enum PushEntry<K, R> {
180    /// The operation is pushed to the submission queue.
181    Pending(K),
182    /// The operation is ready and returns.
183    Ready(R),
184}
185
186impl<K, R> PushEntry<K, R> {
187    /// Get if the current variant is [`PushEntry::Ready`].
188    pub const fn is_ready(&self) -> bool {
189        matches!(self, Self::Ready(_))
190    }
191
192    /// Take the ready variant if exists.
193    pub fn take_ready(self) -> Option<R> {
194        match self {
195            Self::Pending(_) => None,
196            Self::Ready(res) => Some(res),
197        }
198    }
199
200    /// Map the [`PushEntry::Pending`] branch.
201    pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
202        match self {
203            Self::Pending(k) => PushEntry::Pending(f(k)),
204            Self::Ready(r) => PushEntry::Ready(r),
205        }
206    }
207
208    /// Map the [`PushEntry::Ready`] branch.
209    pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
210        match self {
211            Self::Pending(k) => PushEntry::Pending(k),
212            Self::Ready(r) => PushEntry::Ready(f(r)),
213        }
214    }
215}
216
217/// Low-level actions of completion-based IO.
218/// It owns the operations to keep the driver safe.
219pub struct Proactor {
220    driver: Driver,
221}
222
223impl Proactor {
224    /// Create [`Proactor`] with 1024 entries.
225    pub fn new() -> io::Result<Self> {
226        Self::builder().build()
227    }
228
229    /// Create [`ProactorBuilder`] to config the proactor.
230    pub fn builder() -> ProactorBuilder {
231        ProactorBuilder::new()
232    }
233
234    fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
235        Ok(Self {
236            driver: Driver::new(builder)?,
237        })
238    }
239
240    /// Attach an fd to the driver.
241    ///
242    /// ## Platform specific
243    /// * IOCP: it will be attached to the completion port. An fd could only be
244    ///   attached to one driver, and could only be attached once, even if you
245    ///   `try_clone` it.
246    /// * io-uring & polling: it will do nothing but return `Ok(())`.
247    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
248        self.driver.attach(fd)
249    }
250
251    /// Cancel an operation with the pushed user-defined data.
252    ///
253    /// The cancellation is not reliable. The underlying operation may continue,
254    /// but just don't return from [`Proactor::poll`]. Therefore, although an
255    /// operation is cancelled, you should not reuse its `user_data`.
256    pub fn cancel<T: OpCode>(&mut self, mut op: Key<T>) -> Option<BufResult<usize, T>> {
257        instrument!(compio_log::Level::DEBUG, "cancel", ?op);
258        if op.set_cancelled() {
259            // SAFETY: completed.
260            Some(unsafe { op.into_inner() })
261        } else {
262            self.driver
263                .cancel(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) });
264            None
265        }
266    }
267
268    /// Push an operation into the driver, and return the unique key, called
269    /// user-defined data, associated with it.
270    pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
271        let mut op = self.driver.create_op(op);
272        match self
273            .driver
274            .push(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) })
275        {
276            Poll::Pending => PushEntry::Pending(op),
277            Poll::Ready(res) => {
278                op.set_result(res);
279                // SAFETY: just completed.
280                PushEntry::Ready(unsafe { op.into_inner() })
281            }
282        }
283    }
284
285    /// Poll the driver and get completed entries.
286    /// You need to call [`Proactor::pop`] to get the pushed
287    /// operations.
288    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
289        unsafe { self.driver.poll(timeout) }
290    }
291
292    /// Get the pushed operations from the completion entries.
293    ///
294    /// # Panics
295    /// This function will panic if the requested operation has not been
296    /// completed.
297    pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
298        instrument!(compio_log::Level::DEBUG, "pop", ?op);
299        if op.has_result() {
300            let flags = op.flags();
301            // SAFETY: completed.
302            PushEntry::Ready((unsafe { op.into_inner() }, flags))
303        } else {
304            PushEntry::Pending(op)
305        }
306    }
307
308    /// Update the waker of the specified op.
309    pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
310        op.set_waker(waker);
311    }
312
313    /// Create a notify handle to interrupt the inner driver.
314    pub fn handle(&self) -> io::Result<NotifyHandle> {
315        self.driver.handle()
316    }
317}
318
319impl AsRawFd for Proactor {
320    fn as_raw_fd(&self) -> RawFd {
321        self.driver.as_raw_fd()
322    }
323}
324
325/// An completed entry returned from kernel.
326#[derive(Debug)]
327pub(crate) struct Entry {
328    user_data: usize,
329    result: io::Result<usize>,
330    flags: u32,
331}
332
333impl Entry {
334    pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
335        Self {
336            user_data,
337            result,
338            flags: 0,
339        }
340    }
341
342    #[cfg(all(target_os = "linux", feature = "io-uring"))]
343    // this method only used by in io-uring driver
344    pub(crate) fn set_flags(&mut self, flags: u32) {
345        self.flags = flags;
346    }
347
348    /// The user-defined data returned by [`Proactor::push`].
349    pub fn user_data(&self) -> usize {
350        self.user_data
351    }
352
353    pub fn flags(&self) -> u32 {
354        self.flags
355    }
356
357    /// The result of the operation.
358    pub fn into_result(self) -> io::Result<usize> {
359        self.result
360    }
361
362    /// SAFETY: `user_data` should be a valid pointer.
363    pub unsafe fn notify(self) {
364        let user_data = self.user_data();
365        let mut op = Key::<()>::new_unchecked(user_data);
366        op.set_flags(self.flags());
367        if op.set_result(self.into_result()) {
368            // SAFETY: completed and cancelled.
369            let _ = op.into_box();
370        }
371    }
372}
373
374#[derive(Debug, Clone)]
375enum ThreadPoolBuilder {
376    Create { limit: usize, recv_limit: Duration },
377    Reuse(AsyncifyPool),
378}
379
380impl Default for ThreadPoolBuilder {
381    fn default() -> Self {
382        Self::new()
383    }
384}
385
386impl ThreadPoolBuilder {
387    pub fn new() -> Self {
388        Self::Create {
389            limit: 256,
390            recv_limit: Duration::from_secs(60),
391        }
392    }
393
394    pub fn create_or_reuse(&self) -> AsyncifyPool {
395        match self {
396            Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
397            Self::Reuse(pool) => pool.clone(),
398        }
399    }
400}
401
402/// Builder for [`Proactor`].
403#[derive(Debug, Clone)]
404pub struct ProactorBuilder {
405    capacity: u32,
406    pool_builder: ThreadPoolBuilder,
407    sqpoll_idle: Option<Duration>,
408}
409
410impl Default for ProactorBuilder {
411    fn default() -> Self {
412        Self::new()
413    }
414}
415
416impl ProactorBuilder {
417    /// Create the builder with default config.
418    pub fn new() -> Self {
419        Self {
420            capacity: 1024,
421            pool_builder: ThreadPoolBuilder::new(),
422            sqpoll_idle: None,
423        }
424    }
425
426    /// Set the capacity of the inner event queue or submission queue, if
427    /// exists. The default value is 1024.
428    pub fn capacity(&mut self, capacity: u32) -> &mut Self {
429        self.capacity = capacity;
430        self
431    }
432
433    /// Set the thread number limit of the inner thread pool, if exists. The
434    /// default value is 256.
435    ///
436    /// It will be ignored if `reuse_thread_pool` is set.
437    pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
438        if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
439            *limit = value;
440        }
441        self
442    }
443
444    /// Set the waiting timeout of the inner thread, if exists. The default is
445    /// 60 seconds.
446    ///
447    /// It will be ignored if `reuse_thread_pool` is set.
448    pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
449        if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
450            *recv_limit = timeout;
451        }
452        self
453    }
454
455    /// Set to reuse an existing [`AsyncifyPool`] in this proactor.
456    pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
457        self.pool_builder = ThreadPoolBuilder::Reuse(pool);
458        self
459    }
460
461    /// Force reuse the thread pool for each proactor created by this builder,
462    /// even `reuse_thread_pool` is not set.
463    pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
464        self.reuse_thread_pool(self.create_or_get_thread_pool());
465        self
466    }
467
468    /// Create or reuse the thread pool from the config.
469    pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
470        self.pool_builder.create_or_reuse()
471    }
472
473    /// Set `io-uring` sqpoll idle milliseconds, when `sqpoll_idle` is set,
474    /// io-uring sqpoll feature will be enabled
475    ///
476    /// # Notes
477    ///
478    /// - Only effective when the `io-uring` feature is enabled
479    /// - `idle` must >= 1ms, otherwise will set sqpoll idle 0ms
480    /// - `idle` will be rounded down
481    pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
482        self.sqpoll_idle = Some(idle);
483        self
484    }
485
486    /// Build the [`Proactor`].
487    pub fn build(&self) -> io::Result<Proactor> {
488        Proactor::with_builder(self)
489    }
490}