futures_io/
lib.rs

1//! Asynchronous I/O
2//!
3//! This crate contains the `AsyncRead` and `AsyncWrite` traits, the
4//! asynchronous analogs to `std::io::{Read, Write}`. The primary difference is
5//! that these traits integrate with the asynchronous task system.
6
7#![no_std]
8#![deny(missing_docs, missing_debug_implementations)]
9#![doc(html_rnoot_url = "https://docs.rs/futures-io/0.2.2")]
10
11macro_rules! if_std {
12    ($($i:item)*) => ($(
13        #[cfg(feature = "std")]
14        $i
15    )*)
16}
17
18if_std! {
19    extern crate futures_core;
20    extern crate iovec;
21    extern crate std;
22
23    use futures_core::{Async, Poll, task};
24    use std::boxed::Box;
25    use std::io as StdIo;
26    use std::ptr;
27    use std::vec::Vec;
28
29    // Re-export IoVec for convenience
30    pub use iovec::IoVec;
31
32    // Re-export io::Error so that users don't have to deal
33    // with conflicts when `use`ing `futures::io` and `std::io`.
34    pub use StdIo::Error as Error;
35    pub use StdIo::ErrorKind as ErrorKind;
36    pub use StdIo::Result as Result;
37
38    /// A type used to conditionally initialize buffers passed to `AsyncRead`
39    /// methods, modeled after `std`.
40    #[derive(Debug)]
41    pub struct Initializer(bool);
42
43    impl Initializer {
44        /// Returns a new `Initializer` which will zero out buffers.
45        #[inline]
46        pub fn zeroing() -> Initializer {
47            Initializer(true)
48        }
49
50        /// Returns a new `Initializer` which will not zero out buffers.
51        ///
52        /// # Safety
53        ///
54        /// This method may only be called by `AsyncRead`ers which guarantee
55        /// that they will not read from the buffers passed to `AsyncRead`
56        /// methods, and that the return value of the method accurately reflects
57        /// the number of bytes that have been written to the head of the buffer.
58        #[inline]
59        pub unsafe fn nop() -> Initializer {
60            Initializer(false)
61        }
62
63        /// Indicates if a buffer should be initialized.
64        #[inline]
65        pub fn should_initialize(&self) -> bool {
66            self.0
67        }
68
69        /// Initializes a buffer if necessary.
70        #[inline]
71        pub fn initialize(&self, buf: &mut [u8]) {
72            if self.should_initialize() {
73                unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
74            }
75        }
76    }
77
78    /// Read bytes asynchronously.
79    ///
80    /// This trait is analogous to the `std::io::Read` trait, but integrates
81    /// with the asynchronous task system. In particular, the `poll_read`
82    /// method, unlike `Read::read`, will automatically queue the current task
83    /// for wakeup and return if data is not yet available, rather than blocking
84    /// the calling thread.
85    pub trait AsyncRead {
86        /// Determines if this `AsyncRead`er can work with buffers of
87        /// uninitialized memory.
88        ///
89        /// The default implementation returns an initializer which will zero
90        /// buffers.
91        ///
92        /// # Safety
93        ///
94        /// This method is `unsafe` because and `AsyncRead`er could otherwise
95        /// return a non-zeroing `Initializer` from another `AsyncRead` type
96        /// without an `unsafe` block.
97        #[inline]
98        unsafe fn initializer(&self) -> Initializer {
99            Initializer::zeroing()
100        }
101
102        /// Attempt to read from the `AsyncRead` into `buf`.
103        ///
104        /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
105        ///
106        /// If no data is available for reading, the method returns
107        /// `Ok(Async::Pending)` and arranges for the current task (via
108        /// `cx.waker()`) to receive a notification when the object becomes
109        /// readable or is closed.
110        ///
111        /// # Implementation
112        ///
113        /// This function may not return errors of kind `WouldBlock` or
114        /// `Interrupted`.  Implementations must convert `WouldBlock` into
115        /// `Async::Pending` and either internally retry or convert
116        /// `Interrupted` into another error kind.
117        fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
118            -> Poll<usize, Error>;
119
120        /// Attempt to read from the `AsyncRead` into `vec` using vectored
121        /// IO operations.
122        ///
123        /// This method is similar to `poll_read`, but allows data to be read
124        /// into multiple buffers using a single operation.
125        ///
126        /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
127        ///
128        /// If no data is available for reading, the method returns
129        /// `Ok(Async::Pending)` and arranges for the current task (via
130        /// `cx.waker()`) to receive a notification when the object becomes
131        /// readable or is closed.
132        /// By default, this method delegates to using `poll_read` on the first
133        /// buffer in `vec`. Objects which support vectored IO should override
134        /// this method.
135        ///
136        /// # Implementation
137        ///
138        /// This function may not return errors of kind `WouldBlock` or
139        /// `Interrupted`.  Implementations must convert `WouldBlock` into
140        /// `Async::Pending` and either internally retry or convert
141        /// `Interrupted` into another error kind.
142        fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
143            -> Poll<usize, Error>
144        {
145            if let Some(ref mut first_iovec) = vec.get_mut(0) {
146                self.poll_read(cx, first_iovec)
147            } else {
148                // `vec` is empty.
149                return Ok(Async::Ready(0));
150            }
151        }
152    }
153
154    /// Write bytes asynchronously.
155    ///
156    /// This trait is analogous to the `std::io::Write` trait, but integrates
157    /// with the asynchronous task system. In particular, the `poll_write`
158    /// method, unlike `Write::write`, will automatically queue the current task
159    /// for wakeup and return if data is not yet available, rather than blocking
160    /// the calling thread.
161    pub trait AsyncWrite {
162        /// Attempt to write bytes from `buf` into the object.
163        ///
164        /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
165        ///
166        /// If the object is not ready for writing, the method returns
167        /// `Ok(Async::Pending)` and arranges for the current task (via
168        /// `cx.waker()`) to receive a notification when the object becomes
169        /// readable or is closed.
170        ///
171        /// # Implementation
172        ///
173        /// This function may not return errors of kind `WouldBlock` or
174        /// `Interrupted`.  Implementations must convert `WouldBlock` into
175        /// `Async::Pending` and either internally retry or convert
176        /// `Interrupted` into another error kind.
177        fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
178            -> Poll<usize, Error>;
179
180        /// Attempt to write bytes from `vec` into the object using vectored
181        /// IO operations.
182        ///
183        /// This method is similar to `poll_write`, but allows data from multiple buffers to be written
184        /// using a single operation.
185        ///
186        /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
187        ///
188        /// If the object is not ready for writing, the method returns
189        /// `Ok(Async::Pending)` and arranges for the current task (via
190        /// `cx.waker()`) to receive a notification when the object becomes
191        /// readable or is closed.
192        ///
193        /// By default, this method delegates to using `poll_write` on the first
194        /// buffer in `vec`. Objects which support vectored IO should override
195        /// this method.
196        ///
197        /// # Implementation
198        ///
199        /// This function may not return errors of kind `WouldBlock` or
200        /// `Interrupted`.  Implementations must convert `WouldBlock` into
201        /// `Async::Pending` and either internally retry or convert
202        /// `Interrupted` into another error kind.
203        fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
204            -> Poll<usize, Error>
205        {
206            if let Some(ref first_iovec) = vec.get(0) {
207                self.poll_write(cx, &*first_iovec)
208            } else {
209                // `vec` is empty.
210                return Ok(Async::Ready(0));
211            }
212        }
213
214        /// Attempt to flush the object, ensuring that any buffered data reach
215        /// their destination.
216        ///
217        /// On success, returns `Ok(Async::Ready(()))`.
218        ///
219        /// If flushing cannot immediately complete, this method returns
220        /// `Ok(Async::Pending)` and arranges for the current task (via
221        /// `cx.waker()`) to receive a notification when the object can make
222        /// progress towards flushing.
223        ///
224        /// # Implementation
225        ///
226        /// This function may not return errors of kind `WouldBlock` or
227        /// `Interrupted`.  Implementations must convert `WouldBlock` into
228        /// `Async::Pending` and either internally retry or convert
229        /// `Interrupted` into another error kind.
230        fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
231
232        /// Attempt to close the object.
233        ///
234        /// On success, returns `Ok(Async::Ready(()))`.
235        ///
236        /// If closing cannot immediately complete, this function returns
237        /// `Ok(Async::Pending)` and arranges for the current task (via
238        /// `cx.waker()`) to receive a notification when the object can make
239        /// progress towards closing.
240        ///
241        /// # Implementation
242        ///
243        /// This function may not return errors of kind `WouldBlock` or
244        /// `Interrupted`.  Implementations must convert `WouldBlock` into
245        /// `Async::Pending` and either internally retry or convert
246        /// `Interrupted` into another error kind.
247        fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
248    }
249
250    macro_rules! deref_async_read {
251        () => {
252            unsafe fn initializer(&self) -> Initializer {
253                (**self).initializer()
254            }
255
256            fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
257                -> Poll<usize, Error>
258            {
259                (**self).poll_read(cx, buf)
260            }
261
262            fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
263                -> Poll<usize, Error>
264            {
265                (**self).poll_vectored_read(cx, vec)
266            }
267        }
268    }
269
270    impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> {
271        deref_async_read!();
272    }
273
274    impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T {
275        deref_async_read!();
276    }
277
278    /// `unsafe` because the `StdIo::Read` type must not access the buffer
279    /// before reading data into it.
280    macro_rules! unsafe_delegate_async_read_to_stdio {
281        () => {
282            unsafe fn initializer(&self) -> Initializer {
283                Initializer::nop()
284            }
285
286            fn poll_read(&mut self, _: &mut task::Context, buf: &mut [u8])
287                -> Poll<usize, Error>
288            {
289                Ok(Async::Ready(StdIo::Read::read(self, buf)?))
290            }
291        }
292    }
293
294    impl<'a> AsyncRead for &'a [u8] {
295        unsafe_delegate_async_read_to_stdio!();
296    }
297
298    impl AsyncRead for StdIo::Repeat {
299        unsafe_delegate_async_read_to_stdio!();
300    }
301
302    impl<T: AsRef<[u8]>> AsyncRead for StdIo::Cursor<T> {
303        unsafe_delegate_async_read_to_stdio!();
304    }
305
306    macro_rules! deref_async_write {
307        () => {
308            fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
309                -> Poll<usize, Error>
310            {
311                (**self).poll_write(cx, buf)
312            }
313
314            fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
315                -> Poll<usize, Error>
316            {
317                (**self).poll_vectored_write(cx, vec)
318            }
319
320            fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
321                (**self).poll_flush(cx)
322            }
323
324            fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
325                (**self).poll_close(cx)
326            }
327        }
328    }
329
330    impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
331        deref_async_write!();
332    }
333
334    impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
335        deref_async_write!();
336    }
337
338    macro_rules! delegate_async_write_to_stdio {
339        () => {
340            fn poll_write(&mut self, _: &mut task::Context, buf: &[u8])
341                -> Poll<usize, Error>
342            {
343                Ok(Async::Ready(StdIo::Write::write(self, buf)?))
344            }
345
346            fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Error> {
347                Ok(Async::Ready(StdIo::Write::flush(self)?))
348            }
349
350            fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
351                self.poll_flush(cx)
352            }
353        }
354    }
355
356    impl<'a> AsyncWrite for StdIo::Cursor<&'a mut [u8]> {
357        delegate_async_write_to_stdio!();
358    }
359
360    impl AsyncWrite for StdIo::Cursor<Vec<u8>> {
361        delegate_async_write_to_stdio!();
362    }
363
364    impl AsyncWrite for StdIo::Cursor<Box<[u8]>> {
365        delegate_async_write_to_stdio!();
366    }
367
368    impl AsyncWrite for StdIo::Sink {
369        delegate_async_write_to_stdio!();
370    }
371}