broker_tokio/io/async_write.rs
1use bytes::Buf;
2use std::io;
3use std::ops::DerefMut;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7/// Writes bytes asynchronously.
8///
9/// The trait inherits from [`std::io::Write`] and indicates that an I/O object is
10/// **nonblocking**. All non-blocking I/O objects must return an error when
11/// bytes cannot be written instead of blocking the current thread.
12///
13/// Specifically, this means that the [`poll_write`] function will return one of
14/// the following:
15///
16/// * `Poll::Ready(Ok(n))` means that `n` bytes of data was immediately
17/// written.
18///
19/// * `Poll::Pending` means that no data was written from the buffer
20/// provided. The I/O object is not currently writable but may become writable
21/// in the future. Most importantly, **the current future's task is scheduled
22/// to get unparked when the object is writable**. This means that like
23/// `Future::poll` you'll receive a notification when the I/O object is
24/// writable again.
25///
26/// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the
27/// underlying object.
28///
29/// This trait importantly means that the [`write`][stdwrite] method only works in
30/// the context of a future's task. The object may panic if used outside of a task.
31///
32/// Note that this trait also represents that the [`Write::flush`][stdflush] method
33/// works very similarly to the `write` method, notably that `Ok(())` means that the
34/// writer has successfully been flushed, a "would block" error means that the
35/// current task is ready to receive a notification when flushing can make more
36/// progress, and otherwise normal errors can happen as well.
37///
38/// Utilities for working with `AsyncWrite` values are provided by
39/// [`AsyncWriteExt`].
40///
41/// [`std::io::Write`]: std::io::Write
42/// [`poll_write`]: AsyncWrite::poll_write()
43/// [stdwrite]: std::io::Write::write()
44/// [stdflush]: std::io::Write::flush()
45/// [`AsyncWriteExt`]: crate::io::AsyncWriteExt
46pub trait AsyncWrite {
47 /// Attempt to write bytes from `buf` into the object.
48 ///
49 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
50 ///
51 /// If the object is not ready for writing, the method returns
52 /// `Poll::Pending` and arranges for the current task (via
53 /// `cx.waker()`) to receive a notification when the object becomes
54 /// readable or is closed.
55 fn poll_write(
56 self: Pin<&mut Self>,
57 cx: &mut Context<'_>,
58 buf: &[u8],
59 ) -> Poll<Result<usize, io::Error>>;
60
61 /// Attempt to flush the object, ensuring that any buffered data reach
62 /// their destination.
63 ///
64 /// On success, returns `Poll::Ready(Ok(()))`.
65 ///
66 /// If flushing cannot immediately complete, this method returns
67 /// `Poll::Pending` and arranges for the current task (via
68 /// `cx.waker()`) to receive a notification when the object can make
69 /// progress towards flushing.
70 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
71
72 /// Initiates or attempts to shut down this writer, returning success when
73 /// the I/O connection has completely shut down.
74 ///
75 /// This method is intended to be used for asynchronous shutdown of I/O
76 /// connections. For example this is suitable for implementing shutdown of a
77 /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
78 /// Protocols sometimes need to flush out final pieces of data or otherwise
79 /// perform a graceful shutdown handshake, reading/writing more data as
80 /// appropriate. This method is the hook for such protocols to implement the
81 /// graceful shutdown logic.
82 ///
83 /// This `shutdown` method is required by implementers of the
84 /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
85 /// through to the wrapped type, and base types will typically implement
86 /// shutdown logic here or just return `Ok(().into())`. Note that if you're
87 /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
88 /// transitively the entire stream has been shut down. After your wrapper's
89 /// shutdown logic has been executed you should shut down the underlying
90 /// stream.
91 ///
92 /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
93 /// method returns `Ready` it implies that a flush successfully happened
94 /// before the shutdown happened. That is, callers don't need to call
95 /// `flush` before calling `shutdown`. They can rely that by calling
96 /// `shutdown` any pending buffered data will be written out.
97 ///
98 /// # Return value
99 ///
100 /// This function returns a `Poll<io::Result<()>>` classified as such:
101 ///
102 /// * `Poll::Ready(Ok(()))` - indicates that the connection was
103 /// successfully shut down and is now safe to deallocate/drop/close
104 /// resources associated with it. This method means that the current task
105 /// will no longer receive any notifications due to this method and the
106 /// I/O object itself is likely no longer usable.
107 ///
108 /// * `Poll::Pending` - indicates that shutdown is initiated but could
109 /// not complete just yet. This may mean that more I/O needs to happen to
110 /// continue this shutdown operation. The current task is scheduled to
111 /// receive a notification when it's otherwise ready to continue the
112 /// shutdown operation. When woken up this method should be called again.
113 ///
114 /// * `Poll::Ready(Err(e))` - indicates a fatal error has happened with shutdown,
115 /// indicating that the shutdown operation did not complete successfully.
116 /// This typically means that the I/O object is no longer usable.
117 ///
118 /// # Errors
119 ///
120 /// This function can return normal I/O errors through `Err`, described
121 /// above. Additionally this method may also render the underlying
122 /// `Write::write` method no longer usable (e.g. will return errors in the
123 /// future). It's recommended that once `shutdown` is called the
124 /// `write` method is no longer called.
125 ///
126 /// # Panics
127 ///
128 /// This function will panic if not called within the context of a future's
129 /// task.
130 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
131
132 /// Write a `Buf` into this value, returning how many bytes were written.
133 ///
134 /// Note that this method will advance the `buf` provided automatically by
135 /// the number of bytes written.
136 fn poll_write_buf<B: Buf>(
137 self: Pin<&mut Self>,
138 cx: &mut Context<'_>,
139 buf: &mut B,
140 ) -> Poll<Result<usize, io::Error>>
141 where
142 Self: Sized,
143 {
144 if !buf.has_remaining() {
145 return Poll::Ready(Ok(0));
146 }
147
148 let n = ready!(self.poll_write(cx, buf.bytes()))?;
149 buf.advance(n);
150 Poll::Ready(Ok(n))
151 }
152}
153
154macro_rules! deref_async_write {
155 () => {
156 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
157 -> Poll<io::Result<usize>>
158 {
159 Pin::new(&mut **self).poll_write(cx, buf)
160 }
161
162 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
163 Pin::new(&mut **self).poll_flush(cx)
164 }
165
166 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
167 Pin::new(&mut **self).poll_shutdown(cx)
168 }
169 }
170}
171
172impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
173 deref_async_write!();
174}
175
176impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
177 deref_async_write!();
178}
179
180impl<P> AsyncWrite for Pin<P>
181where
182 P: DerefMut + Unpin,
183 P::Target: AsyncWrite,
184{
185 fn poll_write(
186 self: Pin<&mut Self>,
187 cx: &mut Context<'_>,
188 buf: &[u8],
189 ) -> Poll<io::Result<usize>> {
190 self.get_mut().as_mut().poll_write(cx, buf)
191 }
192
193 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
194 self.get_mut().as_mut().poll_flush(cx)
195 }
196
197 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
198 self.get_mut().as_mut().poll_shutdown(cx)
199 }
200}
201
202impl AsyncWrite for Vec<u8> {
203 fn poll_write(
204 self: Pin<&mut Self>,
205 _cx: &mut Context<'_>,
206 buf: &[u8],
207 ) -> Poll<io::Result<usize>> {
208 self.get_mut().extend_from_slice(buf);
209 Poll::Ready(Ok(buf.len()))
210 }
211
212 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
213 Poll::Ready(Ok(()))
214 }
215
216 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
217 Poll::Ready(Ok(()))
218 }
219}
220
221impl AsyncWrite for io::Cursor<&mut [u8]> {
222 fn poll_write(
223 mut self: Pin<&mut Self>,
224 _: &mut Context<'_>,
225 buf: &[u8],
226 ) -> Poll<io::Result<usize>> {
227 Poll::Ready(io::Write::write(&mut *self, buf))
228 }
229
230 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
231 Poll::Ready(io::Write::flush(&mut *self))
232 }
233
234 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
235 self.poll_flush(cx)
236 }
237}
238
239impl AsyncWrite for io::Cursor<&mut Vec<u8>> {
240 fn poll_write(
241 mut self: Pin<&mut Self>,
242 _: &mut Context<'_>,
243 buf: &[u8],
244 ) -> Poll<io::Result<usize>> {
245 Poll::Ready(io::Write::write(&mut *self, buf))
246 }
247
248 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
249 Poll::Ready(io::Write::flush(&mut *self))
250 }
251
252 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
253 self.poll_flush(cx)
254 }
255}
256
257impl AsyncWrite for io::Cursor<Vec<u8>> {
258 fn poll_write(
259 mut self: Pin<&mut Self>,
260 _: &mut Context<'_>,
261 buf: &[u8],
262 ) -> Poll<io::Result<usize>> {
263 Poll::Ready(io::Write::write(&mut *self, buf))
264 }
265
266 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
267 Poll::Ready(io::Write::flush(&mut *self))
268 }
269
270 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
271 self.poll_flush(cx)
272 }
273}
274
275impl AsyncWrite for io::Cursor<Box<[u8]>> {
276 fn poll_write(
277 mut self: Pin<&mut Self>,
278 _: &mut Context<'_>,
279 buf: &[u8],
280 ) -> Poll<io::Result<usize>> {
281 Poll::Ready(io::Write::write(&mut *self, buf))
282 }
283
284 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
285 Poll::Ready(io::Write::flush(&mut *self))
286 }
287
288 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
289 self.poll_flush(cx)
290 }
291}