futures_io/lib.rs
1//! Asynchronous I/O
2//!
3//! This crate contains the `AsyncRead` and `AsyncWrite` traits, the
4//! asynchronous analogs to `std::io::{Read, Write}`. The primary difference is
5//! that these traits integrate with the asynchronous task system.
6
7#![no_std]
8#![deny(missing_docs, missing_debug_implementations)]
9#![doc(html_rnoot_url = "https://docs.rs/futures-io/0.2.2")]
10
11macro_rules! if_std {
12 ($($i:item)*) => ($(
13 #[cfg(feature = "std")]
14 $i
15 )*)
16}
17
18if_std! {
19 extern crate futures_core;
20 extern crate iovec;
21 extern crate std;
22
23 use futures_core::{Async, Poll, task};
24 use std::boxed::Box;
25 use std::io as StdIo;
26 use std::ptr;
27 use std::vec::Vec;
28
29 // Re-export IoVec for convenience
30 pub use iovec::IoVec;
31
32 // Re-export io::Error so that users don't have to deal
33 // with conflicts when `use`ing `futures::io` and `std::io`.
34 pub use StdIo::Error as Error;
35 pub use StdIo::ErrorKind as ErrorKind;
36 pub use StdIo::Result as Result;
37
38 /// A type used to conditionally initialize buffers passed to `AsyncRead`
39 /// methods, modeled after `std`.
40 #[derive(Debug)]
41 pub struct Initializer(bool);
42
43 impl Initializer {
44 /// Returns a new `Initializer` which will zero out buffers.
45 #[inline]
46 pub fn zeroing() -> Initializer {
47 Initializer(true)
48 }
49
50 /// Returns a new `Initializer` which will not zero out buffers.
51 ///
52 /// # Safety
53 ///
54 /// This method may only be called by `AsyncRead`ers which guarantee
55 /// that they will not read from the buffers passed to `AsyncRead`
56 /// methods, and that the return value of the method accurately reflects
57 /// the number of bytes that have been written to the head of the buffer.
58 #[inline]
59 pub unsafe fn nop() -> Initializer {
60 Initializer(false)
61 }
62
63 /// Indicates if a buffer should be initialized.
64 #[inline]
65 pub fn should_initialize(&self) -> bool {
66 self.0
67 }
68
69 /// Initializes a buffer if necessary.
70 #[inline]
71 pub fn initialize(&self, buf: &mut [u8]) {
72 if self.should_initialize() {
73 unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
74 }
75 }
76 }
77
78 /// Read bytes asynchronously.
79 ///
80 /// This trait is analogous to the `std::io::Read` trait, but integrates
81 /// with the asynchronous task system. In particular, the `poll_read`
82 /// method, unlike `Read::read`, will automatically queue the current task
83 /// for wakeup and return if data is not yet available, rather than blocking
84 /// the calling thread.
85 pub trait AsyncRead {
86 /// Determines if this `AsyncRead`er can work with buffers of
87 /// uninitialized memory.
88 ///
89 /// The default implementation returns an initializer which will zero
90 /// buffers.
91 ///
92 /// # Safety
93 ///
94 /// This method is `unsafe` because and `AsyncRead`er could otherwise
95 /// return a non-zeroing `Initializer` from another `AsyncRead` type
96 /// without an `unsafe` block.
97 #[inline]
98 unsafe fn initializer(&self) -> Initializer {
99 Initializer::zeroing()
100 }
101
102 /// Attempt to read from the `AsyncRead` into `buf`.
103 ///
104 /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
105 ///
106 /// If no data is available for reading, the method returns
107 /// `Ok(Async::Pending)` and arranges for the current task (via
108 /// `cx.waker()`) to receive a notification when the object becomes
109 /// readable or is closed.
110 ///
111 /// # Implementation
112 ///
113 /// This function may not return errors of kind `WouldBlock` or
114 /// `Interrupted`. Implementations must convert `WouldBlock` into
115 /// `Async::Pending` and either internally retry or convert
116 /// `Interrupted` into another error kind.
117 fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
118 -> Poll<usize, Error>;
119
120 /// Attempt to read from the `AsyncRead` into `vec` using vectored
121 /// IO operations.
122 ///
123 /// This method is similar to `poll_read`, but allows data to be read
124 /// into multiple buffers using a single operation.
125 ///
126 /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
127 ///
128 /// If no data is available for reading, the method returns
129 /// `Ok(Async::Pending)` and arranges for the current task (via
130 /// `cx.waker()`) to receive a notification when the object becomes
131 /// readable or is closed.
132 /// By default, this method delegates to using `poll_read` on the first
133 /// buffer in `vec`. Objects which support vectored IO should override
134 /// this method.
135 ///
136 /// # Implementation
137 ///
138 /// This function may not return errors of kind `WouldBlock` or
139 /// `Interrupted`. Implementations must convert `WouldBlock` into
140 /// `Async::Pending` and either internally retry or convert
141 /// `Interrupted` into another error kind.
142 fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
143 -> Poll<usize, Error>
144 {
145 if let Some(ref mut first_iovec) = vec.get_mut(0) {
146 self.poll_read(cx, first_iovec)
147 } else {
148 // `vec` is empty.
149 return Ok(Async::Ready(0));
150 }
151 }
152 }
153
154 /// Write bytes asynchronously.
155 ///
156 /// This trait is analogous to the `std::io::Write` trait, but integrates
157 /// with the asynchronous task system. In particular, the `poll_write`
158 /// method, unlike `Write::write`, will automatically queue the current task
159 /// for wakeup and return if data is not yet available, rather than blocking
160 /// the calling thread.
161 pub trait AsyncWrite {
162 /// Attempt to write bytes from `buf` into the object.
163 ///
164 /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
165 ///
166 /// If the object is not ready for writing, the method returns
167 /// `Ok(Async::Pending)` and arranges for the current task (via
168 /// `cx.waker()`) to receive a notification when the object becomes
169 /// readable or is closed.
170 ///
171 /// # Implementation
172 ///
173 /// This function may not return errors of kind `WouldBlock` or
174 /// `Interrupted`. Implementations must convert `WouldBlock` into
175 /// `Async::Pending` and either internally retry or convert
176 /// `Interrupted` into another error kind.
177 fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
178 -> Poll<usize, Error>;
179
180 /// Attempt to write bytes from `vec` into the object using vectored
181 /// IO operations.
182 ///
183 /// This method is similar to `poll_write`, but allows data from multiple buffers to be written
184 /// using a single operation.
185 ///
186 /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
187 ///
188 /// If the object is not ready for writing, the method returns
189 /// `Ok(Async::Pending)` and arranges for the current task (via
190 /// `cx.waker()`) to receive a notification when the object becomes
191 /// readable or is closed.
192 ///
193 /// By default, this method delegates to using `poll_write` on the first
194 /// buffer in `vec`. Objects which support vectored IO should override
195 /// this method.
196 ///
197 /// # Implementation
198 ///
199 /// This function may not return errors of kind `WouldBlock` or
200 /// `Interrupted`. Implementations must convert `WouldBlock` into
201 /// `Async::Pending` and either internally retry or convert
202 /// `Interrupted` into another error kind.
203 fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
204 -> Poll<usize, Error>
205 {
206 if let Some(ref first_iovec) = vec.get(0) {
207 self.poll_write(cx, &*first_iovec)
208 } else {
209 // `vec` is empty.
210 return Ok(Async::Ready(0));
211 }
212 }
213
214 /// Attempt to flush the object, ensuring that any buffered data reach
215 /// their destination.
216 ///
217 /// On success, returns `Ok(Async::Ready(()))`.
218 ///
219 /// If flushing cannot immediately complete, this method returns
220 /// `Ok(Async::Pending)` and arranges for the current task (via
221 /// `cx.waker()`) to receive a notification when the object can make
222 /// progress towards flushing.
223 ///
224 /// # Implementation
225 ///
226 /// This function may not return errors of kind `WouldBlock` or
227 /// `Interrupted`. Implementations must convert `WouldBlock` into
228 /// `Async::Pending` and either internally retry or convert
229 /// `Interrupted` into another error kind.
230 fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
231
232 /// Attempt to close the object.
233 ///
234 /// On success, returns `Ok(Async::Ready(()))`.
235 ///
236 /// If closing cannot immediately complete, this function returns
237 /// `Ok(Async::Pending)` and arranges for the current task (via
238 /// `cx.waker()`) to receive a notification when the object can make
239 /// progress towards closing.
240 ///
241 /// # Implementation
242 ///
243 /// This function may not return errors of kind `WouldBlock` or
244 /// `Interrupted`. Implementations must convert `WouldBlock` into
245 /// `Async::Pending` and either internally retry or convert
246 /// `Interrupted` into another error kind.
247 fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
248 }
249
250 macro_rules! deref_async_read {
251 () => {
252 unsafe fn initializer(&self) -> Initializer {
253 (**self).initializer()
254 }
255
256 fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
257 -> Poll<usize, Error>
258 {
259 (**self).poll_read(cx, buf)
260 }
261
262 fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
263 -> Poll<usize, Error>
264 {
265 (**self).poll_vectored_read(cx, vec)
266 }
267 }
268 }
269
270 impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> {
271 deref_async_read!();
272 }
273
274 impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T {
275 deref_async_read!();
276 }
277
278 /// `unsafe` because the `StdIo::Read` type must not access the buffer
279 /// before reading data into it.
280 macro_rules! unsafe_delegate_async_read_to_stdio {
281 () => {
282 unsafe fn initializer(&self) -> Initializer {
283 Initializer::nop()
284 }
285
286 fn poll_read(&mut self, _: &mut task::Context, buf: &mut [u8])
287 -> Poll<usize, Error>
288 {
289 Ok(Async::Ready(StdIo::Read::read(self, buf)?))
290 }
291 }
292 }
293
294 impl<'a> AsyncRead for &'a [u8] {
295 unsafe_delegate_async_read_to_stdio!();
296 }
297
298 impl AsyncRead for StdIo::Repeat {
299 unsafe_delegate_async_read_to_stdio!();
300 }
301
302 impl<T: AsRef<[u8]>> AsyncRead for StdIo::Cursor<T> {
303 unsafe_delegate_async_read_to_stdio!();
304 }
305
306 macro_rules! deref_async_write {
307 () => {
308 fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
309 -> Poll<usize, Error>
310 {
311 (**self).poll_write(cx, buf)
312 }
313
314 fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
315 -> Poll<usize, Error>
316 {
317 (**self).poll_vectored_write(cx, vec)
318 }
319
320 fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
321 (**self).poll_flush(cx)
322 }
323
324 fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
325 (**self).poll_close(cx)
326 }
327 }
328 }
329
330 impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
331 deref_async_write!();
332 }
333
334 impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
335 deref_async_write!();
336 }
337
338 macro_rules! delegate_async_write_to_stdio {
339 () => {
340 fn poll_write(&mut self, _: &mut task::Context, buf: &[u8])
341 -> Poll<usize, Error>
342 {
343 Ok(Async::Ready(StdIo::Write::write(self, buf)?))
344 }
345
346 fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Error> {
347 Ok(Async::Ready(StdIo::Write::flush(self)?))
348 }
349
350 fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
351 self.poll_flush(cx)
352 }
353 }
354 }
355
356 impl<'a> AsyncWrite for StdIo::Cursor<&'a mut [u8]> {
357 delegate_async_write_to_stdio!();
358 }
359
360 impl AsyncWrite for StdIo::Cursor<Vec<u8>> {
361 delegate_async_write_to_stdio!();
362 }
363
364 impl AsyncWrite for StdIo::Cursor<Box<[u8]>> {
365 delegate_async_write_to_stdio!();
366 }
367
368 impl AsyncWrite for StdIo::Sink {
369 delegate_async_write_to_stdio!();
370 }
371}