hyper/rt/io.rs
1use std::fmt;
2use std::mem::MaybeUninit;
3use std::ops::DerefMut;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7// New IO traits? What?! Why, are you bonkers?
8//
9// I mean, yes, probably. But, here's the goals:
10//
11// 1. Supports poll-based IO operations.
12// 2. Opt-in vectored IO.
13// 3. Can use an optional buffer pool.
14// 4. Able to add completion-based (uring) IO eventually.
15//
16// Frankly, the last point is the entire reason we're doing this. We want to
17// have forwards-compatibility with an eventually stable io-uring runtime. We
18// don't need that to work right away. But it must be possible to add in here
19// without breaking hyper 1.0.
20//
21// While in here, if there's small tweaks to poll_read or poll_write that would
22// allow even the "slow" path to be faster, such as if someone didn't remember
23// to forward along an `is_completion` call.
24
25/// Reads bytes from a source.
26///
27/// This trait is similar to `std::io::Read`, but supports asynchronous reads.
28pub trait Read {
29 /// Attempts to read bytes into the `buf`.
30 ///
31 /// On success, returns `Poll::Ready(Ok(()))` and places data in the
32 /// unfilled portion of `buf`. If no data was read (`buf.remaining()` is
33 /// unchanged), it implies that EOF has been reached.
34 ///
35 /// If no data is available for reading, the method returns `Poll::Pending`
36 /// and arranges for the current task (via `cx.waker()`) to receive a
37 /// notification when the object becomes readable or is closed.
38 fn poll_read(
39 self: Pin<&mut Self>,
40 cx: &mut Context<'_>,
41 buf: ReadBufCursor<'_>,
42 ) -> Poll<Result<(), std::io::Error>>;
43}
44
45/// Write bytes asynchronously.
46///
47/// This trait is similar to `std::io::Write`, but for asynchronous writes.
48pub trait Write {
49 /// Attempt to write bytes from `buf` into the destination.
50 ///
51 /// On success, returns `Poll::Ready(Ok(num_bytes_written)))`. If
52 /// successful, it must be guaranteed that `n <= buf.len()`. A return value
53 /// of `0` means that the underlying object is no longer able to accept
54 /// bytes, or that the provided buffer is empty.
55 ///
56 /// If the object is not ready for writing, the method returns
57 /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to
58 /// receive a notification when the object becomes writable or is closed.
59 fn poll_write(
60 self: Pin<&mut Self>,
61 cx: &mut Context<'_>,
62 buf: &[u8],
63 ) -> Poll<Result<usize, std::io::Error>>;
64
65 /// Attempts to flush the object.
66 ///
67 /// On success, returns `Poll::Ready(Ok(()))`.
68 ///
69 /// If flushing cannot immediately complete, this method returns
70 /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to
71 /// receive a notification when the object can make progress.
72 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>>;
73
74 /// Attempts to shut down this writer.
75 fn poll_shutdown(
76 self: Pin<&mut Self>,
77 cx: &mut Context<'_>,
78 ) -> Poll<Result<(), std::io::Error>>;
79
80 /// Returns whether this writer has an efficient `poll_write_vectored`
81 /// implementation.
82 ///
83 /// The default implementation returns `false`.
84 fn is_write_vectored(&self) -> bool {
85 false
86 }
87
88 /// Like `poll_write`, except that it writes from a slice of buffers.
89 fn poll_write_vectored(
90 self: Pin<&mut Self>,
91 cx: &mut Context<'_>,
92 bufs: &[std::io::IoSlice<'_>],
93 ) -> Poll<Result<usize, std::io::Error>> {
94 let buf = bufs
95 .iter()
96 .find(|b| !b.is_empty())
97 .map_or(&[][..], |b| &**b);
98 self.poll_write(cx, buf)
99 }
100}
101
102/// A wrapper around a byte buffer that is incrementally filled and initialized.
103///
104/// This type is a sort of "double cursor". It tracks three regions in the
105/// buffer: a region at the beginning of the buffer that has been logically
106/// filled with data, a region that has been initialized at some point but not
107/// yet logically filled, and a region at the end that may be uninitialized.
108/// The filled region is guaranteed to be a subset of the initialized region.
109///
110/// In summary, the contents of the buffer can be visualized as:
111///
112/// ```not_rust
113/// [ capacity ]
114/// [ filled | unfilled ]
115/// [ initialized | uninitialized ]
116/// ```
117///
118/// It is undefined behavior to de-initialize any bytes from the uninitialized
119/// region, since it is merely unknown whether this region is uninitialized or
120/// not, and if part of it turns out to be initialized, it must stay initialized.
121pub struct ReadBuf<'a> {
122 raw: &'a mut [MaybeUninit<u8>],
123 filled: usize,
124 init: usize,
125}
126
127/// The cursor part of a [`ReadBuf`].
128///
129/// This is created by calling `ReadBuf::unfilled()`.
130#[derive(Debug)]
131pub struct ReadBufCursor<'a> {
132 buf: &'a mut ReadBuf<'a>,
133}
134
135impl<'data> ReadBuf<'data> {
136 /// Create a new `ReadBuf` with a slice of initialized bytes.
137 #[inline]
138 pub fn new(raw: &'data mut [u8]) -> Self {
139 let len = raw.len();
140 Self {
141 // SAFETY: We never de-init the bytes ourselves.
142 raw: unsafe { &mut *(raw as *mut [u8] as *mut [MaybeUninit<u8>]) },
143 filled: 0,
144 init: len,
145 }
146 }
147
148 /// Create a new `ReadBuf` with a slice of uninitialized bytes.
149 #[inline]
150 pub fn uninit(raw: &'data mut [MaybeUninit<u8>]) -> Self {
151 Self {
152 raw,
153 filled: 0,
154 init: 0,
155 }
156 }
157
158 /// Get a slice of the buffer that has been filled in with bytes.
159 #[inline]
160 pub fn filled(&self) -> &[u8] {
161 // SAFETY: We only slice the filled part of the buffer, which is always valid
162 unsafe { &*(&self.raw[0..self.filled] as *const [MaybeUninit<u8>] as *const [u8]) }
163 }
164
165 /// Get a cursor to the unfilled portion of the buffer.
166 #[inline]
167 pub fn unfilled<'cursor>(&'cursor mut self) -> ReadBufCursor<'cursor> {
168 ReadBufCursor {
169 // SAFETY: self.buf is never re-assigned, so its safe to narrow
170 // the lifetime.
171 buf: unsafe {
172 std::mem::transmute::<&'cursor mut ReadBuf<'data>, &'cursor mut ReadBuf<'cursor>>(
173 self,
174 )
175 },
176 }
177 }
178
179 #[inline]
180 #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
181 pub(crate) unsafe fn set_init(&mut self, n: usize) {
182 self.init = self.init.max(n);
183 }
184
185 #[inline]
186 #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
187 pub(crate) unsafe fn set_filled(&mut self, n: usize) {
188 self.filled = self.filled.max(n);
189 }
190
191 #[inline]
192 #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
193 pub(crate) fn len(&self) -> usize {
194 self.filled
195 }
196
197 #[inline]
198 #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
199 pub(crate) fn init_len(&self) -> usize {
200 self.init
201 }
202
203 #[inline]
204 fn remaining(&self) -> usize {
205 self.capacity() - self.filled
206 }
207
208 #[inline]
209 fn capacity(&self) -> usize {
210 self.raw.len()
211 }
212}
213
214impl fmt::Debug for ReadBuf<'_> {
215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216 f.debug_struct("ReadBuf")
217 .field("filled", &self.filled)
218 .field("init", &self.init)
219 .field("capacity", &self.capacity())
220 .finish()
221 }
222}
223
224impl ReadBufCursor<'_> {
225 /// Access the unfilled part of the buffer.
226 ///
227 /// # Safety
228 ///
229 /// The caller must not uninitialize any bytes that may have been
230 /// initialized before.
231 #[inline]
232 pub unsafe fn as_mut(&mut self) -> &mut [MaybeUninit<u8>] {
233 &mut self.buf.raw[self.buf.filled..]
234 }
235
236 /// Advance the `filled` cursor by `n` bytes.
237 ///
238 /// # Safety
239 ///
240 /// The caller must take care that `n` more bytes have been initialized.
241 #[inline]
242 pub unsafe fn advance(&mut self, n: usize) {
243 self.buf.filled = self.buf.filled.checked_add(n).expect("overflow");
244 self.buf.init = self.buf.filled.max(self.buf.init);
245 }
246
247 /// Returns the number of bytes that can be written from the current
248 /// position until the end of the buffer is reached.
249 ///
250 /// This value is equal to the length of the slice returned by `as_mut()``.
251 #[inline]
252 pub fn remaining(&self) -> usize {
253 self.buf.remaining()
254 }
255
256 /// Transfer bytes into `self`` from `src` and advance the cursor
257 /// by the number of bytes written.
258 ///
259 /// # Panics
260 ///
261 /// `self` must have enough remaining capacity to contain all of `src`.
262 #[inline]
263 pub fn put_slice(&mut self, src: &[u8]) {
264 assert!(
265 self.buf.remaining() >= src.len(),
266 "src.len() must fit in remaining()"
267 );
268
269 let amt = src.len();
270 // Cannot overflow, asserted above
271 let end = self.buf.filled + amt;
272
273 // Safety: the length is asserted above
274 unsafe {
275 self.buf.raw[self.buf.filled..end]
276 .as_mut_ptr()
277 .cast::<u8>()
278 .copy_from_nonoverlapping(src.as_ptr(), amt);
279 }
280
281 if self.buf.init < end {
282 self.buf.init = end;
283 }
284 self.buf.filled = end;
285 }
286}
287
288macro_rules! deref_async_read {
289 () => {
290 fn poll_read(
291 mut self: Pin<&mut Self>,
292 cx: &mut Context<'_>,
293 buf: ReadBufCursor<'_>,
294 ) -> Poll<std::io::Result<()>> {
295 Pin::new(&mut **self).poll_read(cx, buf)
296 }
297 };
298}
299
300impl<T: ?Sized + Read + Unpin> Read for Box<T> {
301 deref_async_read!();
302}
303
304impl<T: ?Sized + Read + Unpin> Read for &mut T {
305 deref_async_read!();
306}
307
308impl<P> Read for Pin<P>
309where
310 P: DerefMut,
311 P::Target: Read,
312{
313 fn poll_read(
314 self: Pin<&mut Self>,
315 cx: &mut Context<'_>,
316 buf: ReadBufCursor<'_>,
317 ) -> Poll<std::io::Result<()>> {
318 pin_as_deref_mut(self).poll_read(cx, buf)
319 }
320}
321
322macro_rules! deref_async_write {
323 () => {
324 fn poll_write(
325 mut self: Pin<&mut Self>,
326 cx: &mut Context<'_>,
327 buf: &[u8],
328 ) -> Poll<std::io::Result<usize>> {
329 Pin::new(&mut **self).poll_write(cx, buf)
330 }
331
332 fn poll_write_vectored(
333 mut self: Pin<&mut Self>,
334 cx: &mut Context<'_>,
335 bufs: &[std::io::IoSlice<'_>],
336 ) -> Poll<std::io::Result<usize>> {
337 Pin::new(&mut **self).poll_write_vectored(cx, bufs)
338 }
339
340 fn is_write_vectored(&self) -> bool {
341 (**self).is_write_vectored()
342 }
343
344 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
345 Pin::new(&mut **self).poll_flush(cx)
346 }
347
348 fn poll_shutdown(
349 mut self: Pin<&mut Self>,
350 cx: &mut Context<'_>,
351 ) -> Poll<std::io::Result<()>> {
352 Pin::new(&mut **self).poll_shutdown(cx)
353 }
354 };
355}
356
357impl<T: ?Sized + Write + Unpin> Write for Box<T> {
358 deref_async_write!();
359}
360
361impl<T: ?Sized + Write + Unpin> Write for &mut T {
362 deref_async_write!();
363}
364
365impl<P> Write for Pin<P>
366where
367 P: DerefMut,
368 P::Target: Write,
369{
370 fn poll_write(
371 self: Pin<&mut Self>,
372 cx: &mut Context<'_>,
373 buf: &[u8],
374 ) -> Poll<std::io::Result<usize>> {
375 pin_as_deref_mut(self).poll_write(cx, buf)
376 }
377
378 fn poll_write_vectored(
379 self: Pin<&mut Self>,
380 cx: &mut Context<'_>,
381 bufs: &[std::io::IoSlice<'_>],
382 ) -> Poll<std::io::Result<usize>> {
383 pin_as_deref_mut(self).poll_write_vectored(cx, bufs)
384 }
385
386 fn is_write_vectored(&self) -> bool {
387 (**self).is_write_vectored()
388 }
389
390 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
391 pin_as_deref_mut(self).poll_flush(cx)
392 }
393
394 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
395 pin_as_deref_mut(self).poll_shutdown(cx)
396 }
397}
398
399/// Polyfill for Pin::as_deref_mut()
400/// TODO: use Pin::as_deref_mut() instead once stabilized
401fn pin_as_deref_mut<P: DerefMut>(pin: Pin<&mut Pin<P>>) -> Pin<&mut P::Target> {
402 // SAFETY: we go directly from Pin<&mut Pin<P>> to Pin<&mut P::Target>, without moving or
403 // giving out the &mut Pin<P> in the process. See Pin::as_deref_mut() for more detail.
404 unsafe { pin.get_unchecked_mut() }.as_mut()
405}