1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsRawFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7 collections::{HashMap, VecDeque},
8 io,
9 num::NonZeroUsize,
10 os::fd::BorrowedFd,
11 pin::Pin,
12 sync::Arc,
13 task::Poll,
14 time::Duration,
15};
16
17use compio_log::{instrument, trace};
18use crossbeam_queue::SegQueue;
19pub(crate) use libc::{sockaddr_storage, socklen_t};
20use polling::{Event, Events, Poller};
21
22use crate::{AsyncifyPool, Entry, Key, ProactorBuilder, op::Interest, syscall};
23
24pub(crate) mod op;
25
26pub trait OpCode {
28 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
31
32 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
34 None
35 }
36
37 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
41}
42
43#[non_exhaustive]
45pub enum Decision {
46 Completed(usize),
48 Wait(WaitArg),
50 Blocking,
52 #[cfg(aio)]
54 Aio(AioControl),
55}
56
57impl Decision {
58 pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
60 Self::Wait(WaitArg { fd, interest })
61 }
62
63 pub fn wait_readable(fd: RawFd) -> Self {
65 Self::wait_for(fd, Interest::Readable)
66 }
67
68 pub fn wait_writable(fd: RawFd) -> Self {
70 Self::wait_for(fd, Interest::Writable)
71 }
72
73 #[cfg(aio)]
75 pub fn aio(
76 cb: &mut libc::aiocb,
77 submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
78 ) -> Self {
79 Self::Aio(AioControl {
80 aiocbp: NonNull::from(cb),
81 submit,
82 })
83 }
84}
85
86#[derive(Debug, Clone, Copy)]
88pub struct WaitArg {
89 pub fd: RawFd,
91 pub interest: Interest,
93}
94
95#[cfg(aio)]
97#[derive(Debug, Clone, Copy)]
98pub struct AioControl {
99 pub aiocbp: NonNull<libc::aiocb>,
101 pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
103}
104
105#[derive(Debug, Default)]
106struct FdQueue {
107 read_queue: VecDeque<usize>,
108 write_queue: VecDeque<usize>,
109}
110
111impl FdQueue {
112 pub fn push_back_interest(&mut self, user_data: usize, interest: Interest) {
113 match interest {
114 Interest::Readable => self.read_queue.push_back(user_data),
115 Interest::Writable => self.write_queue.push_back(user_data),
116 }
117 }
118
119 pub fn push_front_interest(&mut self, user_data: usize, interest: Interest) {
120 match interest {
121 Interest::Readable => self.read_queue.push_front(user_data),
122 Interest::Writable => self.write_queue.push_front(user_data),
123 }
124 }
125
126 pub fn remove(&mut self, user_data: usize) {
127 self.read_queue.retain(|&k| k != user_data);
128 self.write_queue.retain(|&k| k != user_data);
129 }
130
131 pub fn event(&self) -> Event {
132 let mut event = Event::none(0);
133 if let Some(&key) = self.read_queue.front() {
134 event.readable = true;
135 event.key = key;
136 }
137 if let Some(&key) = self.write_queue.front() {
138 event.writable = true;
139 event.key = key;
140 }
141 event
142 }
143
144 pub fn pop_interest(&mut self, event: &Event) -> Option<(usize, Interest)> {
145 if event.readable {
146 if let Some(user_data) = self.read_queue.pop_front() {
147 return Some((user_data, Interest::Readable));
148 }
149 }
150 if event.writable {
151 if let Some(user_data) = self.write_queue.pop_front() {
152 return Some((user_data, Interest::Writable));
153 }
154 }
155 None
156 }
157}
158
159#[non_exhaustive]
162pub enum OpType {
163 Fd(RawFd),
165 #[cfg(aio)]
167 Aio(NonNull<libc::aiocb>),
168}
169
170pub(crate) struct Driver {
172 events: Events,
173 poll: Arc<Poller>,
174 registry: HashMap<RawFd, FdQueue>,
175 pool: AsyncifyPool,
176 pool_completed: Arc<SegQueue<Entry>>,
177}
178
179impl Driver {
180 pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
181 instrument!(compio_log::Level::TRACE, "new", ?builder);
182 trace!("new poll driver");
183 let entries = builder.capacity as usize; let events = if entries == 0 {
185 Events::new()
186 } else {
187 Events::with_capacity(NonZeroUsize::new(entries).unwrap())
188 };
189
190 let poll = Arc::new(Poller::new()?);
191
192 Ok(Self {
193 events,
194 poll,
195 registry: HashMap::new(),
196 pool: builder.create_or_get_thread_pool(),
197 pool_completed: Arc::new(SegQueue::new()),
198 })
199 }
200
201 pub fn create_op<T: crate::sys::OpCode + 'static>(&self, op: T) -> Key<T> {
202 Key::new(self.as_raw_fd(), op)
203 }
204
205 unsafe fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result<()> {
208 let need_add = !self.registry.contains_key(&arg.fd);
209 let queue = self.registry.entry(arg.fd).or_default();
210 queue.push_back_interest(user_data, arg.interest);
211 let event = queue.event();
212 if need_add {
213 self.poll.add(arg.fd, event)?;
214 } else {
215 let fd = BorrowedFd::borrow_raw(arg.fd);
216 self.poll.modify(fd, event)?;
217 }
218 Ok(())
219 }
220
221 fn renew(
222 poll: &Poller,
223 registry: &mut HashMap<RawFd, FdQueue>,
224 fd: BorrowedFd,
225 renew_event: Event,
226 ) -> io::Result<()> {
227 if !renew_event.readable && !renew_event.writable {
228 poll.delete(fd)?;
229 registry.remove(&fd.as_raw_fd());
230 } else {
231 poll.modify(fd, renew_event)?;
232 }
233 Ok(())
234 }
235
236 pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
237 Ok(())
238 }
239
240 pub fn cancel(&mut self, op: &mut Key<dyn crate::sys::OpCode>) {
241 let op_pin = op.as_op_pin();
242 match op_pin.op_type() {
243 None => {}
244 Some(OpType::Fd(fd)) => {
245 let queue = self
246 .registry
247 .get_mut(&fd)
248 .expect("the fd should be attached");
249 queue.remove(op.user_data());
250 let renew_event = queue.event();
251 if Self::renew(
252 &self.poll,
253 &mut self.registry,
254 unsafe { BorrowedFd::borrow_raw(fd) },
255 renew_event,
256 )
257 .is_ok()
258 {
259 self.pool_completed.push(entry_cancelled(op.user_data()));
260 }
261 }
262 #[cfg(aio)]
263 Some(OpType::Aio(aiocbp)) => {
264 let aiocb = unsafe { aiocbp.as_ref() };
265 let fd = aiocb.aio_fildes;
266 syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
267 }
268 }
269 }
270
271 pub fn push(&mut self, op: &mut Key<dyn crate::sys::OpCode>) -> Poll<io::Result<usize>> {
272 instrument!(compio_log::Level::TRACE, "push", ?op);
273 let user_data = op.user_data();
274 let op_pin = op.as_op_pin();
275 match op_pin.pre_submit()? {
276 Decision::Wait(arg) => {
277 unsafe {
279 self.submit(user_data, arg)?;
280 }
281 trace!("register {:?}", arg);
282 Poll::Pending
283 }
284 Decision::Completed(res) => Poll::Ready(Ok(res)),
285 Decision::Blocking => self.push_blocking(user_data),
286 #[cfg(aio)]
287 Decision::Aio(AioControl { mut aiocbp, submit }) => {
288 let aiocb = unsafe { aiocbp.as_mut() };
289 #[cfg(freebsd)]
290 {
291 aiocb.aio_sigevent.sigev_signo = self.poll.as_raw_fd();
293 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
294 aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
295 }
296 #[cfg(solarish)]
297 let mut notify = libc::port_notify {
298 portnfy_port: self.poll.as_raw_fd(),
299 portnfy_user: user_data as _,
300 };
301 #[cfg(solarish)]
302 {
303 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
304 aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
305 }
306 match syscall!(submit(aiocbp.as_ptr())) {
307 Ok(_) => Poll::Pending,
308 Err(e)
316 if matches!(
317 e.raw_os_error(),
318 Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
319 ) =>
320 {
321 self.push_blocking(user_data)
322 }
323 Err(e) => Poll::Ready(Err(e)),
324 }
325 }
326 }
327 }
328
329 fn push_blocking(&mut self, user_data: usize) -> Poll<io::Result<usize>> {
330 let poll = self.poll.clone();
331 let completed = self.pool_completed.clone();
332 let mut closure = move || {
333 let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
334 let op_pin = op.as_op_pin();
335 let res = match op_pin.operate() {
336 Poll::Pending => unreachable!("this operation is not non-blocking"),
337 Poll::Ready(res) => res,
338 };
339 completed.push(Entry::new(user_data, res));
340 poll.notify().ok();
341 };
342 loop {
343 match self.pool.dispatch(closure) {
344 Ok(()) => return Poll::Pending,
345 Err(e) => {
346 closure = e.0;
347 self.poll_blocking();
348 }
349 }
350 }
351 }
352
353 fn poll_blocking(&mut self) -> bool {
354 if self.pool_completed.is_empty() {
355 return false;
356 }
357 while let Some(entry) = self.pool_completed.pop() {
358 unsafe {
359 entry.notify();
360 }
361 }
362 true
363 }
364
365 pub unsafe fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
366 instrument!(compio_log::Level::TRACE, "poll", ?timeout);
367 if self.poll_blocking() {
368 return Ok(());
369 }
370 self.poll.wait(&mut self.events, timeout)?;
371 if self.events.is_empty() && timeout.is_some() {
372 return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
373 }
374 for event in self.events.iter() {
375 let user_data = event.key;
376 trace!("receive {} for {:?}", user_data, event);
377 let mut op = Key::<dyn crate::sys::OpCode>::new_unchecked(user_data);
378 let op = op.as_op_pin();
379 match op.op_type() {
380 None => {
381 trace!("op {} is completed", user_data);
384 }
385 Some(OpType::Fd(fd)) => {
386 let queue = self
389 .registry
390 .get_mut(&fd)
391 .expect("the fd should be attached");
392 if let Some((user_data, interest)) = queue.pop_interest(&event) {
393 let mut op = Key::<dyn crate::sys::OpCode>::new_unchecked(user_data);
394 let op = op.as_op_pin();
395 let res = match op.operate() {
396 Poll::Pending => {
397 queue.push_front_interest(user_data, interest);
399 None
400 }
401 Poll::Ready(res) => Some(res),
402 };
403 if let Some(res) = res {
404 Entry::new(user_data, res).notify();
405 }
406 }
407 let renew_event = queue.event();
408 Self::renew(
409 &self.poll,
410 &mut self.registry,
411 BorrowedFd::borrow_raw(fd),
412 renew_event,
413 )?;
414 }
415 #[cfg(aio)]
416 Some(OpType::Aio(aiocbp)) => {
417 let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
418 let res = match err {
419 libc::EINPROGRESS => {
424 trace!("op {} is not completed", user_data);
425 continue;
426 }
427 libc::ECANCELED => {
428 libc::aio_return(aiocbp.as_ptr());
430 Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
431 }
432 _ => syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize),
433 };
434 Entry::new(user_data, res).notify();
435 }
436 }
437 }
438 Ok(())
439 }
440
441 pub fn handle(&self) -> NotifyHandle {
442 NotifyHandle::new(self.poll.clone())
443 }
444}
445
446impl AsRawFd for Driver {
447 fn as_raw_fd(&self) -> RawFd {
448 self.poll.as_raw_fd()
449 }
450}
451
452impl Drop for Driver {
453 fn drop(&mut self) {
454 for fd in self.registry.keys() {
455 unsafe {
456 let fd = BorrowedFd::borrow_raw(*fd);
457 self.poll.delete(fd).ok();
458 }
459 }
460 }
461}
462
463fn entry_cancelled(user_data: usize) -> Entry {
464 Entry::new(
465 user_data,
466 Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)),
467 )
468}
469
470pub struct NotifyHandle {
472 poll: Arc<Poller>,
473}
474
475impl NotifyHandle {
476 fn new(poll: Arc<Poller>) -> Self {
477 Self { poll }
478 }
479
480 pub fn notify(&self) -> io::Result<()> {
482 self.poll.notify()
483 }
484}