compio_driver/
key.rs

1use std::{io, marker::PhantomData, mem::MaybeUninit, pin::Pin, task::Waker};
2
3use compio_buf::BufResult;
4
5use crate::{OpCode, Overlapped, PushEntry, RawFd};
6
7/// An operation with other needed information. It should be allocated on the
8/// heap. The pointer to this struct is used as `user_data`, and on Windows, it
9/// is used as the pointer to `OVERLAPPED`.
10///
11/// `*const RawOp<dyn OpCode>` can be obtained from any `Key<T: OpCode>` by
12/// first casting `Key::user_data` to `*const RawOp<()>`, then upcasted with
13/// `upcast_fn`. It is done in [`Key::as_op_pin`].
14#[repr(C)]
15pub(crate) struct RawOp<T: ?Sized> {
16    header: Overlapped,
17    // The cancelled flag and the result here are manual reference counting. The driver holds the
18    // strong ref until it completes; the runtime holds the strong ref until the future is
19    // dropped.
20    cancelled: bool,
21    // The metadata in `*mut RawOp<dyn OpCode>`
22    metadata: usize,
23    result: PushEntry<Option<Waker>, io::Result<usize>>,
24    flags: u32,
25    op: T,
26}
27
28#[repr(C)]
29union OpCodePtrRepr {
30    ptr: *mut RawOp<dyn OpCode>,
31    components: OpCodePtrComponents,
32}
33
34#[repr(C)]
35#[derive(Clone, Copy)]
36struct OpCodePtrComponents {
37    data_pointer: *mut (),
38    metadata: usize,
39}
40
41fn opcode_metadata<T: OpCode + 'static>() -> usize {
42    let mut op = MaybeUninit::<RawOp<T>>::uninit();
43    // SAFETY: same as `core::ptr::metadata`.
44    unsafe {
45        OpCodePtrRepr {
46            ptr: op.as_mut_ptr(),
47        }
48        .components
49        .metadata
50    }
51}
52
53const unsafe fn opcode_dyn_mut(ptr: *mut (), metadata: usize) -> *mut RawOp<dyn OpCode> {
54    OpCodePtrRepr {
55        components: OpCodePtrComponents {
56            data_pointer: ptr,
57            metadata,
58        },
59    }
60    .ptr
61}
62
63/// A typed wrapper for key of Ops submitted into driver. It doesn't free the
64/// inner on dropping. Instead, the memory is managed by the proactor. The inner
65/// is only freed when:
66///
67/// 1. The op is completed and the future asks the result. `into_inner` will be
68///    called by the proactor.
69/// 2. The op is completed and the future cancels it. `into_box` will be called
70///    by the proactor.
71#[derive(PartialEq, Eq, Hash)]
72pub struct Key<T: ?Sized> {
73    user_data: *mut (),
74    _p: PhantomData<Box<RawOp<T>>>,
75}
76
77impl<T: ?Sized> Unpin for Key<T> {}
78
79impl<T: OpCode + 'static> Key<T> {
80    /// Create [`RawOp`] and get the [`Key`] to it.
81    pub(crate) fn new(driver: RawFd, op: T) -> Self {
82        let header = Overlapped::new(driver);
83        let raw_op = Box::new(RawOp {
84            header,
85            cancelled: false,
86            metadata: opcode_metadata::<T>(),
87            result: PushEntry::Pending(None),
88            flags: 0,
89            op,
90        });
91        unsafe { Self::new_unchecked(Box::into_raw(raw_op) as _) }
92    }
93}
94
95impl<T: ?Sized> Key<T> {
96    /// Create a new `Key` with the given user data.
97    ///
98    /// # Safety
99    ///
100    /// Caller needs to ensure that `T` does correspond to `user_data` in driver
101    /// this `Key` is created with. In most cases, it is enough to let `T` be
102    /// `dyn OpCode`.
103    pub unsafe fn new_unchecked(user_data: usize) -> Self {
104        Self {
105            user_data: user_data as _,
106            _p: PhantomData,
107        }
108    }
109
110    /// Get the unique user-defined data.
111    pub fn user_data(&self) -> usize {
112        self.user_data as _
113    }
114
115    fn as_opaque(&self) -> &RawOp<()> {
116        // SAFETY: user_data is unique and RawOp is repr(C).
117        unsafe { &*(self.user_data as *const RawOp<()>) }
118    }
119
120    fn as_opaque_mut(&mut self) -> &mut RawOp<()> {
121        // SAFETY: see `as_opaque`.
122        unsafe { &mut *(self.user_data as *mut RawOp<()>) }
123    }
124
125    fn as_dyn_mut_ptr(&mut self) -> *mut RawOp<dyn OpCode> {
126        let user_data = self.user_data;
127        let this = self.as_opaque_mut();
128        // SAFETY: metadata from `Key::new`.
129        unsafe { opcode_dyn_mut(user_data, this.metadata) }
130    }
131
132    /// A pointer to OVERLAPPED.
133    #[cfg(windows)]
134    pub(crate) fn as_mut_ptr(&mut self) -> *mut Overlapped {
135        &mut self.as_opaque_mut().header
136    }
137
138    /// Cancel the op, decrease the ref count. The return value indicates if the
139    /// op is completed. If so, the op should be dropped because it is
140    /// useless.
141    pub(crate) fn set_cancelled(&mut self) -> bool {
142        self.as_opaque_mut().cancelled = true;
143        self.has_result()
144    }
145
146    /// Complete the op, decrease the ref count. Wake the future if a waker is
147    /// set. The return value indicates if the op is cancelled. If so, the
148    /// op should be dropped because it is useless.
149    pub(crate) fn set_result(&mut self, res: io::Result<usize>) -> bool {
150        let this = unsafe { &mut *self.as_dyn_mut_ptr() };
151        #[cfg(all(target_os = "linux", feature = "io-uring"))]
152        if let Ok(res) = res {
153            unsafe {
154                Pin::new_unchecked(&mut this.op).set_result(res);
155            }
156        }
157        if let PushEntry::Pending(Some(w)) =
158            std::mem::replace(&mut this.result, PushEntry::Ready(res))
159        {
160            w.wake();
161        }
162        this.cancelled
163    }
164
165    pub(crate) fn set_flags(&mut self, flags: u32) {
166        self.as_opaque_mut().flags = flags;
167    }
168
169    pub(crate) fn flags(&self) -> u32 {
170        self.as_opaque().flags
171    }
172
173    /// Whether the op is completed.
174    pub(crate) fn has_result(&self) -> bool {
175        self.as_opaque().result.is_ready()
176    }
177
178    /// Set waker of the current future.
179    pub(crate) fn set_waker(&mut self, waker: Waker) {
180        if let PushEntry::Pending(w) = &mut self.as_opaque_mut().result {
181            *w = Some(waker)
182        }
183    }
184
185    /// Get the inner [`RawOp`]. It is usually used to drop the inner
186    /// immediately, without knowing about the inner `T`.
187    ///
188    /// # Safety
189    ///
190    /// Call it only when the op is cancelled and completed, which is the case
191    /// when the ref count becomes zero. See doc of [`Key::set_cancelled`]
192    /// and [`Key::set_result`].
193    pub(crate) unsafe fn into_box(mut self) -> Box<RawOp<dyn OpCode>> {
194        Box::from_raw(self.as_dyn_mut_ptr())
195    }
196}
197
198impl<T> Key<T> {
199    /// Get the inner result if it is completed.
200    ///
201    /// # Safety
202    ///
203    /// Call it only when the op is completed, otherwise it is UB.
204    pub(crate) unsafe fn into_inner(self) -> BufResult<usize, T> {
205        let op = unsafe { Box::from_raw(self.user_data as *mut RawOp<T>) };
206        BufResult(op.result.take_ready().unwrap_unchecked(), op.op)
207    }
208}
209
210impl<T: OpCode + ?Sized> Key<T> {
211    /// Pin the inner op.
212    pub(crate) fn as_op_pin(&mut self) -> Pin<&mut dyn OpCode> {
213        // SAFETY: the inner won't be moved.
214        unsafe {
215            let this = &mut *self.as_dyn_mut_ptr();
216            Pin::new_unchecked(&mut this.op)
217        }
218    }
219
220    /// Call [`OpCode::operate`] and assume that it is not an overlapped op,
221    /// which means it never returns [`Poll::Pending`].
222    ///
223    /// [`Poll::Pending`]: std::task::Poll::Pending
224    #[cfg(windows)]
225    pub(crate) fn operate_blocking(&mut self) -> io::Result<usize> {
226        use std::task::Poll;
227
228        let optr = self.as_mut_ptr();
229        let op = self.as_op_pin();
230        let res = unsafe { op.operate(optr.cast()) };
231        match res {
232            Poll::Pending => unreachable!("this operation is not overlapped"),
233            Poll::Ready(res) => res,
234        }
235    }
236}
237
238impl<T: ?Sized> std::fmt::Debug for Key<T> {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        write!(f, "Key({})", self.user_data())
241    }
242}