1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
5#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
6#![warn(missing_docs)]
7
8#[cfg(all(
9 target_os = "linux",
10 not(feature = "io-uring"),
11 not(feature = "polling")
12))]
13compile_error!("You must choose at least one of these features: [\"io-uring\", \"polling\"]");
14
15use std::{
16 io,
17 task::{Poll, Waker},
18 time::Duration,
19};
20
21use compio_buf::BufResult;
22use compio_log::instrument;
23
24mod key;
25pub use key::Key;
26
27pub mod op;
28#[cfg(unix)]
29#[cfg_attr(docsrs, doc(cfg(all())))]
30mod unix;
31#[cfg(unix)]
32use unix::Overlapped;
33
34mod asyncify;
35pub use asyncify::*;
36
37mod fd;
38pub use fd::*;
39
40mod driver_type;
41pub use driver_type::*;
42
43cfg_if::cfg_if! {
44 if #[cfg(windows)] {
45 #[path = "iocp/mod.rs"]
46 mod sys;
47 } else if #[cfg(all(target_os = "linux", feature = "polling", feature = "io-uring"))] {
48 #[path = "fusion/mod.rs"]
49 mod sys;
50 } else if #[cfg(all(target_os = "linux", feature = "io-uring"))] {
51 #[path = "iour/mod.rs"]
52 mod sys;
53 } else if #[cfg(unix)] {
54 #[path = "poll/mod.rs"]
55 mod sys;
56 }
57}
58
59pub use sys::*;
60
61#[cfg(windows)]
62#[macro_export]
63#[doc(hidden)]
64macro_rules! syscall {
65 (BOOL, $e:expr) => {
66 $crate::syscall!($e, == 0)
67 };
68 (SOCKET, $e:expr) => {
69 $crate::syscall!($e, != 0)
70 };
71 (HANDLE, $e:expr) => {
72 $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
73 };
74 ($e:expr, $op: tt $rhs: expr) => {{
75 #[allow(unused_unsafe)]
76 let res = unsafe { $e };
77 if res $op $rhs {
78 Err(::std::io::Error::last_os_error())
79 } else {
80 Ok(res)
81 }
82 }};
83}
84
85#[cfg(unix)]
87#[macro_export]
88#[doc(hidden)]
89macro_rules! syscall {
90 (break $e:expr) => {
91 loop {
92 match $crate::syscall!($e) {
93 Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)),
94 Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
95 => break ::std::task::Poll::Pending,
96 Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {},
97 Err(e) => break ::std::task::Poll::Ready(Err(e)),
98 }
99 }
100 };
101 ($e:expr, $f:ident($fd:expr)) => {
102 match $crate::syscall!(break $e) {
103 ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
104 ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
105 ::std::task::Poll::Ready(Err(e)) => Err(e),
106 }
107 };
108 ($e:expr) => {{
109 #[allow(unused_unsafe)]
110 let res = unsafe { $e };
111 if res == -1 {
112 Err(::std::io::Error::last_os_error())
113 } else {
114 Ok(res)
115 }
116 }};
117}
118
119#[macro_export]
120#[doc(hidden)]
121macro_rules! impl_raw_fd {
122 ($t:ty, $it:ty, $inner:ident) => {
123 impl $crate::AsRawFd for $t {
124 fn as_raw_fd(&self) -> $crate::RawFd {
125 self.$inner.as_raw_fd()
126 }
127 }
128 #[cfg(unix)]
129 impl std::os::fd::FromRawFd for $t {
130 unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
131 Self {
132 $inner: std::os::fd::FromRawFd::from_raw_fd(fd),
133 }
134 }
135 }
136 impl $crate::ToSharedFd<$it> for $t {
137 fn to_shared_fd(&self) -> $crate::SharedFd<$it> {
138 self.$inner.to_shared_fd()
139 }
140 }
141 };
142 ($t:ty, $it:ty, $inner:ident,file) => {
143 $crate::impl_raw_fd!($t, $it, $inner);
144 #[cfg(windows)]
145 impl std::os::windows::io::FromRawHandle for $t {
146 unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
147 Self {
148 $inner: std::os::windows::io::FromRawHandle::from_raw_handle(handle),
149 }
150 }
151 }
152 #[cfg(windows)]
153 impl std::os::windows::io::AsRawHandle for $t {
154 fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
155 self.$inner.as_raw_handle()
156 }
157 }
158 };
159 ($t:ty, $it:ty, $inner:ident,socket) => {
160 $crate::impl_raw_fd!($t, $it, $inner);
161 #[cfg(windows)]
162 impl std::os::windows::io::FromRawSocket for $t {
163 unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self {
164 Self {
165 $inner: std::os::windows::io::FromRawSocket::from_raw_socket(sock),
166 }
167 }
168 }
169 #[cfg(windows)]
170 impl std::os::windows::io::AsRawSocket for $t {
171 fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
172 self.$inner.as_raw_socket()
173 }
174 }
175 };
176}
177
178pub enum PushEntry<K, R> {
180 Pending(K),
182 Ready(R),
184}
185
186impl<K, R> PushEntry<K, R> {
187 pub const fn is_ready(&self) -> bool {
189 matches!(self, Self::Ready(_))
190 }
191
192 pub fn take_ready(self) -> Option<R> {
194 match self {
195 Self::Pending(_) => None,
196 Self::Ready(res) => Some(res),
197 }
198 }
199
200 pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
202 match self {
203 Self::Pending(k) => PushEntry::Pending(f(k)),
204 Self::Ready(r) => PushEntry::Ready(r),
205 }
206 }
207
208 pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
210 match self {
211 Self::Pending(k) => PushEntry::Pending(k),
212 Self::Ready(r) => PushEntry::Ready(f(r)),
213 }
214 }
215}
216
217pub struct Proactor {
220 driver: Driver,
221}
222
223impl Proactor {
224 pub fn new() -> io::Result<Self> {
226 Self::builder().build()
227 }
228
229 pub fn builder() -> ProactorBuilder {
231 ProactorBuilder::new()
232 }
233
234 fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
235 Ok(Self {
236 driver: Driver::new(builder)?,
237 })
238 }
239
240 pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
248 self.driver.attach(fd)
249 }
250
251 pub fn cancel<T: OpCode>(&mut self, mut op: Key<T>) -> Option<BufResult<usize, T>> {
257 instrument!(compio_log::Level::DEBUG, "cancel", ?op);
258 if op.set_cancelled() {
259 Some(unsafe { op.into_inner() })
261 } else {
262 self.driver
263 .cancel(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) });
264 None
265 }
266 }
267
268 pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
271 let mut op = self.driver.create_op(op);
272 match self
273 .driver
274 .push(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) })
275 {
276 Poll::Pending => PushEntry::Pending(op),
277 Poll::Ready(res) => {
278 op.set_result(res);
279 PushEntry::Ready(unsafe { op.into_inner() })
281 }
282 }
283 }
284
285 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
289 unsafe { self.driver.poll(timeout) }
290 }
291
292 pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
298 instrument!(compio_log::Level::DEBUG, "pop", ?op);
299 if op.has_result() {
300 let flags = op.flags();
301 PushEntry::Ready((unsafe { op.into_inner() }, flags))
303 } else {
304 PushEntry::Pending(op)
305 }
306 }
307
308 pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
310 op.set_waker(waker);
311 }
312
313 pub fn handle(&self) -> io::Result<NotifyHandle> {
315 self.driver.handle()
316 }
317}
318
319impl AsRawFd for Proactor {
320 fn as_raw_fd(&self) -> RawFd {
321 self.driver.as_raw_fd()
322 }
323}
324
325#[derive(Debug)]
327pub(crate) struct Entry {
328 user_data: usize,
329 result: io::Result<usize>,
330 flags: u32,
331}
332
333impl Entry {
334 pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
335 Self {
336 user_data,
337 result,
338 flags: 0,
339 }
340 }
341
342 #[cfg(all(target_os = "linux", feature = "io-uring"))]
343 pub(crate) fn set_flags(&mut self, flags: u32) {
345 self.flags = flags;
346 }
347
348 pub fn user_data(&self) -> usize {
350 self.user_data
351 }
352
353 pub fn flags(&self) -> u32 {
354 self.flags
355 }
356
357 pub fn into_result(self) -> io::Result<usize> {
359 self.result
360 }
361
362 pub unsafe fn notify(self) {
364 let user_data = self.user_data();
365 let mut op = Key::<()>::new_unchecked(user_data);
366 op.set_flags(self.flags());
367 if op.set_result(self.into_result()) {
368 let _ = op.into_box();
370 }
371 }
372}
373
374#[derive(Debug, Clone)]
375enum ThreadPoolBuilder {
376 Create { limit: usize, recv_limit: Duration },
377 Reuse(AsyncifyPool),
378}
379
380impl Default for ThreadPoolBuilder {
381 fn default() -> Self {
382 Self::new()
383 }
384}
385
386impl ThreadPoolBuilder {
387 pub fn new() -> Self {
388 Self::Create {
389 limit: 256,
390 recv_limit: Duration::from_secs(60),
391 }
392 }
393
394 pub fn create_or_reuse(&self) -> AsyncifyPool {
395 match self {
396 Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
397 Self::Reuse(pool) => pool.clone(),
398 }
399 }
400}
401
402#[derive(Debug, Clone)]
404pub struct ProactorBuilder {
405 capacity: u32,
406 pool_builder: ThreadPoolBuilder,
407 sqpoll_idle: Option<Duration>,
408}
409
410impl Default for ProactorBuilder {
411 fn default() -> Self {
412 Self::new()
413 }
414}
415
416impl ProactorBuilder {
417 pub fn new() -> Self {
419 Self {
420 capacity: 1024,
421 pool_builder: ThreadPoolBuilder::new(),
422 sqpoll_idle: None,
423 }
424 }
425
426 pub fn capacity(&mut self, capacity: u32) -> &mut Self {
429 self.capacity = capacity;
430 self
431 }
432
433 pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
438 if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
439 *limit = value;
440 }
441 self
442 }
443
444 pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
449 if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
450 *recv_limit = timeout;
451 }
452 self
453 }
454
455 pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
457 self.pool_builder = ThreadPoolBuilder::Reuse(pool);
458 self
459 }
460
461 pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
464 self.reuse_thread_pool(self.create_or_get_thread_pool());
465 self
466 }
467
468 pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
470 self.pool_builder.create_or_reuse()
471 }
472
473 pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
482 self.sqpoll_idle = Some(idle);
483 self
484 }
485
486 pub fn build(&self) -> io::Result<Proactor> {
488 Proactor::with_builder(self)
489 }
490}