broker_tokio/io/
async_write.rs

1use bytes::Buf;
2use std::io;
3use std::ops::DerefMut;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7/// Writes bytes asynchronously.
8///
9/// The trait inherits from [`std::io::Write`] and indicates that an I/O object is
10/// **nonblocking**. All non-blocking I/O objects must return an error when
11/// bytes cannot be written instead of blocking the current thread.
12///
13/// Specifically, this means that the [`poll_write`] function will return one of
14/// the following:
15///
16/// * `Poll::Ready(Ok(n))` means that `n` bytes of data was immediately
17///   written.
18///
19/// * `Poll::Pending` means that no data was written from the buffer
20///   provided. The I/O object is not currently writable but may become writable
21///   in the future. Most importantly, **the current future's task is scheduled
22///   to get unparked when the object is writable**. This means that like
23///   `Future::poll` you'll receive a notification when the I/O object is
24///   writable again.
25///
26/// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the
27///   underlying object.
28///
29/// This trait importantly means that the [`write`][stdwrite] method only works in
30/// the context of a future's task. The object may panic if used outside of a task.
31///
32/// Note that this trait also represents that the  [`Write::flush`][stdflush] method
33/// works very similarly to the `write` method, notably that `Ok(())` means that the
34/// writer has successfully been flushed, a "would block" error means that the
35/// current task is ready to receive a notification when flushing can make more
36/// progress, and otherwise normal errors can happen as well.
37///
38/// Utilities for working with `AsyncWrite` values are provided by
39/// [`AsyncWriteExt`].
40///
41/// [`std::io::Write`]: std::io::Write
42/// [`poll_write`]: AsyncWrite::poll_write()
43/// [stdwrite]: std::io::Write::write()
44/// [stdflush]: std::io::Write::flush()
45/// [`AsyncWriteExt`]: crate::io::AsyncWriteExt
46pub trait AsyncWrite {
47    /// Attempt to write bytes from `buf` into the object.
48    ///
49    /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
50    ///
51    /// If the object is not ready for writing, the method returns
52    /// `Poll::Pending` and arranges for the current task (via
53    /// `cx.waker()`) to receive a notification when the object becomes
54    /// readable or is closed.
55    fn poll_write(
56        self: Pin<&mut Self>,
57        cx: &mut Context<'_>,
58        buf: &[u8],
59    ) -> Poll<Result<usize, io::Error>>;
60
61    /// Attempt to flush the object, ensuring that any buffered data reach
62    /// their destination.
63    ///
64    /// On success, returns `Poll::Ready(Ok(()))`.
65    ///
66    /// If flushing cannot immediately complete, this method returns
67    /// `Poll::Pending` and arranges for the current task (via
68    /// `cx.waker()`) to receive a notification when the object can make
69    /// progress towards flushing.
70    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
71
72    /// Initiates or attempts to shut down this writer, returning success when
73    /// the I/O connection has completely shut down.
74    ///
75    /// This method is intended to be used for asynchronous shutdown of I/O
76    /// connections. For example this is suitable for implementing shutdown of a
77    /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
78    /// Protocols sometimes need to flush out final pieces of data or otherwise
79    /// perform a graceful shutdown handshake, reading/writing more data as
80    /// appropriate. This method is the hook for such protocols to implement the
81    /// graceful shutdown logic.
82    ///
83    /// This `shutdown` method is required by implementers of the
84    /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
85    /// through to the wrapped type, and base types will typically implement
86    /// shutdown logic here or just return `Ok(().into())`. Note that if you're
87    /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
88    /// transitively the entire stream has been shut down. After your wrapper's
89    /// shutdown logic has been executed you should shut down the underlying
90    /// stream.
91    ///
92    /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
93    /// method returns `Ready` it implies that a flush successfully happened
94    /// before the shutdown happened. That is, callers don't need to call
95    /// `flush` before calling `shutdown`. They can rely that by calling
96    /// `shutdown` any pending buffered data will be written out.
97    ///
98    /// # Return value
99    ///
100    /// This function returns a `Poll<io::Result<()>>` classified as such:
101    ///
102    /// * `Poll::Ready(Ok(()))` - indicates that the connection was
103    ///   successfully shut down and is now safe to deallocate/drop/close
104    ///   resources associated with it. This method means that the current task
105    ///   will no longer receive any notifications due to this method and the
106    ///   I/O object itself is likely no longer usable.
107    ///
108    /// * `Poll::Pending` - indicates that shutdown is initiated but could
109    ///   not complete just yet. This may mean that more I/O needs to happen to
110    ///   continue this shutdown operation. The current task is scheduled to
111    ///   receive a notification when it's otherwise ready to continue the
112    ///   shutdown operation. When woken up this method should be called again.
113    ///
114    /// * `Poll::Ready(Err(e))` - indicates a fatal error has happened with shutdown,
115    ///   indicating that the shutdown operation did not complete successfully.
116    ///   This typically means that the I/O object is no longer usable.
117    ///
118    /// # Errors
119    ///
120    /// This function can return normal I/O errors through `Err`, described
121    /// above. Additionally this method may also render the underlying
122    /// `Write::write` method no longer usable (e.g. will return errors in the
123    /// future). It's recommended that once `shutdown` is called the
124    /// `write` method is no longer called.
125    ///
126    /// # Panics
127    ///
128    /// This function will panic if not called within the context of a future's
129    /// task.
130    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
131
132    /// Write a `Buf` into this value, returning how many bytes were written.
133    ///
134    /// Note that this method will advance the `buf` provided automatically by
135    /// the number of bytes written.
136    fn poll_write_buf<B: Buf>(
137        self: Pin<&mut Self>,
138        cx: &mut Context<'_>,
139        buf: &mut B,
140    ) -> Poll<Result<usize, io::Error>>
141    where
142        Self: Sized,
143    {
144        if !buf.has_remaining() {
145            return Poll::Ready(Ok(0));
146        }
147
148        let n = ready!(self.poll_write(cx, buf.bytes()))?;
149        buf.advance(n);
150        Poll::Ready(Ok(n))
151    }
152}
153
154macro_rules! deref_async_write {
155    () => {
156        fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
157            -> Poll<io::Result<usize>>
158        {
159            Pin::new(&mut **self).poll_write(cx, buf)
160        }
161
162        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
163            Pin::new(&mut **self).poll_flush(cx)
164        }
165
166        fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
167            Pin::new(&mut **self).poll_shutdown(cx)
168        }
169    }
170}
171
172impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
173    deref_async_write!();
174}
175
176impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
177    deref_async_write!();
178}
179
180impl<P> AsyncWrite for Pin<P>
181where
182    P: DerefMut + Unpin,
183    P::Target: AsyncWrite,
184{
185    fn poll_write(
186        self: Pin<&mut Self>,
187        cx: &mut Context<'_>,
188        buf: &[u8],
189    ) -> Poll<io::Result<usize>> {
190        self.get_mut().as_mut().poll_write(cx, buf)
191    }
192
193    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
194        self.get_mut().as_mut().poll_flush(cx)
195    }
196
197    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
198        self.get_mut().as_mut().poll_shutdown(cx)
199    }
200}
201
202impl AsyncWrite for Vec<u8> {
203    fn poll_write(
204        self: Pin<&mut Self>,
205        _cx: &mut Context<'_>,
206        buf: &[u8],
207    ) -> Poll<io::Result<usize>> {
208        self.get_mut().extend_from_slice(buf);
209        Poll::Ready(Ok(buf.len()))
210    }
211
212    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
213        Poll::Ready(Ok(()))
214    }
215
216    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
217        Poll::Ready(Ok(()))
218    }
219}
220
221impl AsyncWrite for io::Cursor<&mut [u8]> {
222    fn poll_write(
223        mut self: Pin<&mut Self>,
224        _: &mut Context<'_>,
225        buf: &[u8],
226    ) -> Poll<io::Result<usize>> {
227        Poll::Ready(io::Write::write(&mut *self, buf))
228    }
229
230    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
231        Poll::Ready(io::Write::flush(&mut *self))
232    }
233
234    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
235        self.poll_flush(cx)
236    }
237}
238
239impl AsyncWrite for io::Cursor<&mut Vec<u8>> {
240    fn poll_write(
241        mut self: Pin<&mut Self>,
242        _: &mut Context<'_>,
243        buf: &[u8],
244    ) -> Poll<io::Result<usize>> {
245        Poll::Ready(io::Write::write(&mut *self, buf))
246    }
247
248    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
249        Poll::Ready(io::Write::flush(&mut *self))
250    }
251
252    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
253        self.poll_flush(cx)
254    }
255}
256
257impl AsyncWrite for io::Cursor<Vec<u8>> {
258    fn poll_write(
259        mut self: Pin<&mut Self>,
260        _: &mut Context<'_>,
261        buf: &[u8],
262    ) -> Poll<io::Result<usize>> {
263        Poll::Ready(io::Write::write(&mut *self, buf))
264    }
265
266    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
267        Poll::Ready(io::Write::flush(&mut *self))
268    }
269
270    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
271        self.poll_flush(cx)
272    }
273}
274
275impl AsyncWrite for io::Cursor<Box<[u8]>> {
276    fn poll_write(
277        mut self: Pin<&mut Self>,
278        _: &mut Context<'_>,
279        buf: &[u8],
280    ) -> Poll<io::Result<usize>> {
281        Poll::Ready(io::Write::write(&mut *self, buf))
282    }
283
284    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
285        Poll::Ready(io::Write::flush(&mut *self))
286    }
287
288    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
289        self.poll_flush(cx)
290    }
291}