#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
#![warn(missing_docs)]
#[cfg(all(
target_os = "linux",
not(feature = "io-uring"),
not(feature = "polling")
))]
compile_error!("You must choose at least one of these features: [\"io-uring\", \"polling\"]");
use std::{
io,
task::{Poll, Waker},
time::Duration,
};
use compio_buf::BufResult;
use compio_log::instrument;
mod key;
pub use key::Key;
pub mod op;
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(all())))]
mod unix;
#[cfg(unix)]
use unix::Overlapped;
mod asyncify;
pub use asyncify::*;
mod fd;
pub use fd::*;
mod driver_type;
pub use driver_type::*;
cfg_if::cfg_if! {
if #[cfg(windows)] {
#[path = "iocp/mod.rs"]
mod sys;
} else if #[cfg(all(target_os = "linux", feature = "polling", feature = "io-uring"))] {
#[path = "fusion/mod.rs"]
mod sys;
} else if #[cfg(all(target_os = "linux", feature = "io-uring"))] {
#[path = "iour/mod.rs"]
mod sys;
} else if #[cfg(unix)] {
#[path = "poll/mod.rs"]
mod sys;
}
}
pub use sys::*;
#[cfg(windows)]
#[macro_export]
#[doc(hidden)]
macro_rules! syscall {
(BOOL, $e:expr) => {
$crate::syscall!($e, == 0)
};
(SOCKET, $e:expr) => {
$crate::syscall!($e, != 0)
};
(HANDLE, $e:expr) => {
$crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
};
($e:expr, $op: tt $rhs: expr) => {{
#[allow(unused_unsafe)]
let res = unsafe { $e };
if res $op $rhs {
Err(::std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
#[cfg(unix)]
#[macro_export]
#[doc(hidden)]
macro_rules! syscall {
(break $e:expr) => {
loop {
match $crate::syscall!($e) {
Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)),
Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
=> break ::std::task::Poll::Pending,
Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {},
Err(e) => break ::std::task::Poll::Ready(Err(e)),
}
}
};
($e:expr, $f:ident($fd:expr)) => {
match $crate::syscall!(break $e) {
::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
::std::task::Poll::Ready(Err(e)) => Err(e),
}
};
($e:expr) => {{
#[allow(unused_unsafe)]
let res = unsafe { $e };
if res == -1 {
Err(::std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
#[macro_export]
#[doc(hidden)]
macro_rules! impl_raw_fd {
($t:ty, $it:ty, $inner:ident) => {
impl $crate::AsRawFd for $t {
fn as_raw_fd(&self) -> $crate::RawFd {
self.$inner.as_raw_fd()
}
}
#[cfg(unix)]
impl std::os::fd::FromRawFd for $t {
unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
Self {
$inner: std::os::fd::FromRawFd::from_raw_fd(fd),
}
}
}
impl $crate::ToSharedFd<$it> for $t {
fn to_shared_fd(&self) -> $crate::SharedFd<$it> {
self.$inner.to_shared_fd()
}
}
};
($t:ty, $it:ty, $inner:ident,file) => {
$crate::impl_raw_fd!($t, $it, $inner);
#[cfg(windows)]
impl std::os::windows::io::FromRawHandle for $t {
unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
Self {
$inner: std::os::windows::io::FromRawHandle::from_raw_handle(handle),
}
}
}
#[cfg(windows)]
impl std::os::windows::io::AsRawHandle for $t {
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
self.$inner.as_raw_handle()
}
}
};
($t:ty, $it:ty, $inner:ident,socket) => {
$crate::impl_raw_fd!($t, $it, $inner);
#[cfg(windows)]
impl std::os::windows::io::FromRawSocket for $t {
unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self {
Self {
$inner: std::os::windows::io::FromRawSocket::from_raw_socket(sock),
}
}
}
#[cfg(windows)]
impl std::os::windows::io::AsRawSocket for $t {
fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
self.$inner.as_raw_socket()
}
}
};
}
pub enum PushEntry<K, R> {
Pending(K),
Ready(R),
}
impl<K, R> PushEntry<K, R> {
pub const fn is_ready(&self) -> bool {
matches!(self, Self::Ready(_))
}
pub fn take_ready(self) -> Option<R> {
match self {
Self::Pending(_) => None,
Self::Ready(res) => Some(res),
}
}
pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
match self {
Self::Pending(k) => PushEntry::Pending(f(k)),
Self::Ready(r) => PushEntry::Ready(r),
}
}
pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
match self {
Self::Pending(k) => PushEntry::Pending(k),
Self::Ready(r) => PushEntry::Ready(f(r)),
}
}
}
pub struct Proactor {
driver: Driver,
}
impl Proactor {
pub fn new() -> io::Result<Self> {
Self::builder().build()
}
pub fn builder() -> ProactorBuilder {
ProactorBuilder::new()
}
fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
Ok(Self {
driver: Driver::new(builder)?,
})
}
pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
self.driver.attach(fd)
}
pub fn cancel<T: OpCode>(&mut self, mut op: Key<T>) -> Option<BufResult<usize, T>> {
instrument!(compio_log::Level::DEBUG, "cancel", ?op);
if op.set_cancelled() {
Some(unsafe { op.into_inner() })
} else {
self.driver
.cancel(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) });
None
}
}
pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
let mut op = self.driver.create_op(op);
match self
.driver
.push(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) })
{
Poll::Pending => PushEntry::Pending(op),
Poll::Ready(res) => {
op.set_result(res);
PushEntry::Ready(unsafe { op.into_inner() })
}
}
}
pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
unsafe { self.driver.poll(timeout) }
}
pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
instrument!(compio_log::Level::DEBUG, "pop", ?op);
if op.has_result() {
let flags = op.flags();
PushEntry::Ready((unsafe { op.into_inner() }, flags))
} else {
PushEntry::Pending(op)
}
}
pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
op.set_waker(waker);
}
pub fn handle(&self) -> io::Result<NotifyHandle> {
self.driver.handle()
}
}
impl AsRawFd for Proactor {
fn as_raw_fd(&self) -> RawFd {
self.driver.as_raw_fd()
}
}
#[derive(Debug)]
pub(crate) struct Entry {
user_data: usize,
result: io::Result<usize>,
flags: u32,
}
impl Entry {
pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
Self {
user_data,
result,
flags: 0,
}
}
#[cfg(all(target_os = "linux", feature = "io-uring"))]
pub(crate) fn set_flags(&mut self, flags: u32) {
self.flags = flags;
}
pub fn user_data(&self) -> usize {
self.user_data
}
pub fn flags(&self) -> u32 {
self.flags
}
pub fn into_result(self) -> io::Result<usize> {
self.result
}
pub unsafe fn notify(self) {
let user_data = self.user_data();
let mut op = Key::<()>::new_unchecked(user_data);
op.set_flags(self.flags());
if op.set_result(self.into_result()) {
let _ = op.into_box();
}
}
}
#[derive(Debug, Clone)]
enum ThreadPoolBuilder {
Create { limit: usize, recv_limit: Duration },
Reuse(AsyncifyPool),
}
impl Default for ThreadPoolBuilder {
fn default() -> Self {
Self::new()
}
}
impl ThreadPoolBuilder {
pub fn new() -> Self {
Self::Create {
limit: 256,
recv_limit: Duration::from_secs(60),
}
}
pub fn create_or_reuse(&self) -> AsyncifyPool {
match self {
Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
Self::Reuse(pool) => pool.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct ProactorBuilder {
capacity: u32,
pool_builder: ThreadPoolBuilder,
sqpoll_idle: Option<Duration>,
}
impl Default for ProactorBuilder {
fn default() -> Self {
Self::new()
}
}
impl ProactorBuilder {
pub fn new() -> Self {
Self {
capacity: 1024,
pool_builder: ThreadPoolBuilder::new(),
sqpoll_idle: None,
}
}
pub fn capacity(&mut self, capacity: u32) -> &mut Self {
self.capacity = capacity;
self
}
pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
*limit = value;
}
self
}
pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
*recv_limit = timeout;
}
self
}
pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
self.pool_builder = ThreadPoolBuilder::Reuse(pool);
self
}
pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
self.reuse_thread_pool(self.create_or_get_thread_pool());
self
}
pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
self.pool_builder.create_or_reuse()
}
pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
self.sqpoll_idle = Some(idle);
self
}
pub fn build(&self) -> io::Result<Proactor> {
Proactor::with_builder(self)
}
}