tokio_io/async_write.rs
1use bytes::Buf;
2use futures::{Async, Poll};
3use std::io as std_io;
4
5use AsyncRead;
6
7/// Writes bytes asynchronously.
8///
9/// The trait inherits from `std::io::Write` and indicates that an I/O object is
10/// **nonblocking**. All non-blocking I/O objects must return an error when
11/// bytes cannot be written instead of blocking the current thread.
12///
13/// Specifically, this means that the `poll_write` function will return one of
14/// the following:
15///
16/// * `Ok(Async::Ready(n))` means that `n` bytes of data was immediately
17/// written.
18///
19/// * `Ok(Async::NotReady)` means that no data was written from the buffer
20/// provided. The I/O object is not currently writable but may become writable
21/// in the future. Most importantly, **the current future's task is scheduled
22/// to get unparked when the object is writable**. This means that like
23/// `Future::poll` you'll receive a notification when the I/O object is
24/// writable again.
25///
26/// * `Err(e)` for other errors are standard I/O errors coming from the
27/// underlying object.
28///
29/// This trait importantly means that the `write` method only works in the
30/// context of a future's task. The object may panic if used outside of a task.
31///
32/// Note that this trait also represents that the `Write::flush` method works
33/// very similarly to the `write` method, notably that `Ok(())` means that the
34/// writer has successfully been flushed, a "would block" error means that the
35/// current task is ready to receive a notification when flushing can make more
36/// progress, and otherwise normal errors can happen as well.
37pub trait AsyncWrite: std_io::Write {
38 /// Attempt to write bytes from `buf` into the object.
39 ///
40 /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
41 ///
42 /// If the object is not ready for writing, the method returns
43 /// `Ok(Async::NotReady)` and arranges for the current task (via
44 /// `cx.waker()`) to receive a notification when the object becomes
45 /// readable or is closed.
46 fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> {
47 match self.write(buf) {
48 Ok(t) => Ok(Async::Ready(t)),
49 Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
50 Err(e) => return Err(e.into()),
51 }
52 }
53
54 /// Attempt to flush the object, ensuring that any buffered data reach
55 /// their destination.
56 ///
57 /// On success, returns `Ok(Async::Ready(()))`.
58 ///
59 /// If flushing cannot immediately complete, this method returns
60 /// `Ok(Async::NotReady)` and arranges for the current task (via
61 /// `cx.waker()`) to receive a notification when the object can make
62 /// progress towards flushing.
63 fn poll_flush(&mut self) -> Poll<(), std_io::Error> {
64 match self.flush() {
65 Ok(t) => Ok(Async::Ready(t)),
66 Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
67 Err(e) => return Err(e.into()),
68 }
69 }
70
71 /// Initiates or attempts to shut down this writer, returning success when
72 /// the I/O connection has completely shut down.
73 ///
74 /// This method is intended to be used for asynchronous shutdown of I/O
75 /// connections. For example this is suitable for implementing shutdown of a
76 /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
77 /// Protocols sometimes need to flush out final pieces of data or otherwise
78 /// perform a graceful shutdown handshake, reading/writing more data as
79 /// appropriate. This method is the hook for such protocols to implement the
80 /// graceful shutdown logic.
81 ///
82 /// This `shutdown` method is required by implementers of the
83 /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
84 /// through to the wrapped type, and base types will typically implement
85 /// shutdown logic here or just return `Ok(().into())`. Note that if you're
86 /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
87 /// transitively the entire stream has been shut down. After your wrapper's
88 /// shutdown logic has been executed you should shut down the underlying
89 /// stream.
90 ///
91 /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
92 /// method returns `Ready` it implies that a flush successfully happened
93 /// before the shutdown happened. That is, callers don't need to call
94 /// `flush` before calling `shutdown`. They can rely that by calling
95 /// `shutdown` any pending buffered data will be written out.
96 ///
97 /// # Return value
98 ///
99 /// This function returns a `Poll<(), io::Error>` classified as such:
100 ///
101 /// * `Ok(Async::Ready(()))` - indicates that the connection was
102 /// successfully shut down and is now safe to deallocate/drop/close
103 /// resources associated with it. This method means that the current task
104 /// will no longer receive any notifications due to this method and the
105 /// I/O object itself is likely no longer usable.
106 ///
107 /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could
108 /// not complete just yet. This may mean that more I/O needs to happen to
109 /// continue this shutdown operation. The current task is scheduled to
110 /// receive a notification when it's otherwise ready to continue the
111 /// shutdown operation. When woken up this method should be called again.
112 ///
113 /// * `Err(e)` - indicates a fatal error has happened with shutdown,
114 /// indicating that the shutdown operation did not complete successfully.
115 /// This typically means that the I/O object is no longer usable.
116 ///
117 /// # Errors
118 ///
119 /// This function can return normal I/O errors through `Err`, described
120 /// above. Additionally this method may also render the underlying
121 /// `Write::write` method no longer usable (e.g. will return errors in the
122 /// future). It's recommended that once `shutdown` is called the
123 /// `write` method is no longer called.
124 ///
125 /// # Panics
126 ///
127 /// This function will panic if not called within the context of a future's
128 /// task.
129 fn shutdown(&mut self) -> Poll<(), std_io::Error>;
130
131 /// Write a `Buf` into this value, returning how many bytes were written.
132 ///
133 /// Note that this method will advance the `buf` provided automatically by
134 /// the number of bytes written.
135 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error>
136 where
137 Self: Sized,
138 {
139 if !buf.has_remaining() {
140 return Ok(Async::Ready(0));
141 }
142
143 let n = try_ready!(self.poll_write(buf.bytes()));
144 buf.advance(n);
145 Ok(Async::Ready(n))
146 }
147}
148
149impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
150 fn shutdown(&mut self) -> Poll<(), std_io::Error> {
151 (**self).shutdown()
152 }
153}
154impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
155 fn shutdown(&mut self) -> Poll<(), std_io::Error> {
156 (**self).shutdown()
157 }
158}
159
160impl AsyncRead for std_io::Repeat {
161 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
162 false
163 }
164}
165
166impl AsyncWrite for std_io::Sink {
167 fn shutdown(&mut self) -> Poll<(), std_io::Error> {
168 Ok(().into())
169 }
170}
171
172impl<T: AsyncRead> AsyncRead for std_io::Take<T> {
173 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
174 self.get_ref().prepare_uninitialized_buffer(buf)
175 }
176}
177
178impl<T, U> AsyncRead for std_io::Chain<T, U>
179where
180 T: AsyncRead,
181 U: AsyncRead,
182{
183 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
184 let (t, u) = self.get_ref();
185 // We don't need to execute the second initializer if the first one
186 // already zeroed the buffer out.
187 t.prepare_uninitialized_buffer(buf) || u.prepare_uninitialized_buffer(buf)
188 }
189}
190
191impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> {
192 fn shutdown(&mut self) -> Poll<(), std_io::Error> {
193 try_ready!(self.poll_flush());
194 self.get_mut().shutdown()
195 }
196}
197
198impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> {
199 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
200 self.get_ref().prepare_uninitialized_buffer(buf)
201 }
202}
203
204impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> {}
205
206impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> {
207 fn shutdown(&mut self) -> Poll<(), std_io::Error> {
208 Ok(().into())
209 }
210}
211
212impl AsyncWrite for std_io::Cursor<Vec<u8>> {
213 fn shutdown(&mut self) -> Poll<(), std_io::Error> {
214 Ok(().into())
215 }
216}
217
218impl AsyncWrite for std_io::Cursor<Box<[u8]>> {
219 fn shutdown(&mut self) -> Poll<(), std_io::Error> {
220 Ok(().into())
221 }
222}