broker_tokio/io/poll_evented.rs
1use crate::io::driver::platform;
2use crate::io::{AsyncRead, AsyncWrite, Registration};
3
4use mio::event::Evented;
5use std::fmt;
6use std::io::{self, Read, Write};
7use std::marker::Unpin;
8use std::pin::Pin;
9use std::sync::atomic::AtomicUsize;
10use std::sync::atomic::Ordering::Relaxed;
11use std::task::{Context, Poll};
12
13cfg_io_driver! {
14 /// Associates an I/O resource that implements the [`std::io::Read`] and/or
15 /// [`std::io::Write`] traits with the reactor that drives it.
16 ///
17 /// `PollEvented` uses [`Registration`] internally to take a type that
18 /// implements [`mio::Evented`] as well as [`std::io::Read`] and or
19 /// [`std::io::Write`] and associate it with a reactor that will drive it.
20 ///
21 /// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be
22 /// used from within the future's execution model. As such, the
23 /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
24 /// implementations using the underlying I/O resource as well as readiness
25 /// events provided by the reactor.
26 ///
27 /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
28 /// `Sync`), the caller must ensure that there are at most two tasks that
29 /// use a `PollEvented` instance concurrently. One for reading and one for
30 /// writing. While violating this requirement is "safe" from a Rust memory
31 /// model point of view, it will result in unexpected behavior in the form
32 /// of lost notifications and tasks hanging.
33 ///
34 /// ## Readiness events
35 ///
36 /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
37 /// this type also supports access to the underlying readiness event stream.
38 /// While similar in function to what [`Registration`] provides, the
39 /// semantics are a bit different.
40 ///
41 /// Two functions are provided to access the readiness events:
42 /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
43 /// current readiness state of the `PollEvented` instance. If
44 /// [`poll_read_ready`] indicates read readiness, immediately calling
45 /// [`poll_read_ready`] again will also indicate read readiness.
46 ///
47 /// When the operation is attempted and is unable to succeed due to the I/O
48 /// resource not being ready, the caller must call [`clear_read_ready`] or
49 /// [`clear_write_ready`]. This clears the readiness state until a new
50 /// readiness event is received.
51 ///
52 /// This allows the caller to implement additional functions. For example,
53 /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
54 /// [`clear_read_ready`].
55 ///
56 /// ```rust
57 /// use tokio::io::PollEvented;
58 ///
59 /// use futures::ready;
60 /// use mio::Ready;
61 /// use mio::net::{TcpStream, TcpListener};
62 /// use std::io;
63 /// use std::task::{Context, Poll};
64 ///
65 /// struct MyListener {
66 /// poll_evented: PollEvented<TcpListener>,
67 /// }
68 ///
69 /// impl MyListener {
70 /// pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<TcpStream, io::Error>> {
71 /// let ready = Ready::readable();
72 ///
73 /// ready!(self.poll_evented.poll_read_ready(cx, ready))?;
74 ///
75 /// match self.poll_evented.get_ref().accept() {
76 /// Ok((socket, _)) => Poll::Ready(Ok(socket)),
77 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
78 /// self.poll_evented.clear_read_ready(cx, ready)?;
79 /// Poll::Pending
80 /// }
81 /// Err(e) => Poll::Ready(Err(e)),
82 /// }
83 /// }
84 /// }
85 /// ```
86 ///
87 /// ## Platform-specific events
88 ///
89 /// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
90 /// These events are included as part of the read readiness event stream. The
91 /// write readiness event stream is only for `Ready::writable()` events.
92 ///
93 /// [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
94 /// [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
95 /// [`AsyncRead`]: ../io/trait.AsyncRead.html
96 /// [`AsyncWrite`]: ../io/trait.AsyncWrite.html
97 /// [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html
98 /// [`Registration`]: struct.Registration.html
99 /// [`TcpListener`]: ../net/struct.TcpListener.html
100 /// [`clear_read_ready`]: #method.clear_read_ready
101 /// [`clear_write_ready`]: #method.clear_write_ready
102 /// [`poll_read_ready`]: #method.poll_read_ready
103 /// [`poll_write_ready`]: #method.poll_write_ready
104 pub struct PollEvented<E: Evented> {
105 io: Option<E>,
106 inner: Inner,
107 }
108}
109
110struct Inner {
111 registration: Registration,
112
113 /// Currently visible read readiness
114 read_readiness: AtomicUsize,
115
116 /// Currently visible write readiness
117 write_readiness: AtomicUsize,
118}
119
120// ===== impl PollEvented =====
121
122macro_rules! poll_ready {
123 ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{
124 // Load cached & encoded readiness.
125 let mut cached = $me.inner.$cache.load(Relaxed);
126 let mask = $mask | platform::hup();
127
128 // See if the current readiness matches any bits.
129 let mut ret = mio::Ready::from_usize(cached) & $mask;
130
131 if ret.is_empty() {
132 // Readiness does not match, consume the registration's readiness
133 // stream. This happens in a loop to ensure that the stream gets
134 // drained.
135 loop {
136 let ready = match $poll? {
137 Poll::Ready(v) => v,
138 Poll::Pending => return Poll::Pending,
139 };
140 cached |= ready.as_usize();
141
142 // Update the cache store
143 $me.inner.$cache.store(cached, Relaxed);
144
145 ret |= ready & mask;
146
147 if !ret.is_empty() {
148 return Poll::Ready(Ok(ret));
149 }
150 }
151 } else {
152 // Check what's new with the registration stream. This will not
153 // request to be notified
154 if let Some(ready) = $me.inner.registration.$take()? {
155 cached |= ready.as_usize();
156 $me.inner.$cache.store(cached, Relaxed);
157 }
158
159 Poll::Ready(Ok(mio::Ready::from_usize(cached)))
160 }
161 }};
162}
163
164impl<E> PollEvented<E>
165where
166 E: Evented,
167{
168 /// Creates a new `PollEvented` associated with the default reactor.
169 ///
170 /// # Panics
171 ///
172 /// This function panics if thread-local runtime is not set.
173 ///
174 /// The runtime is usually set implicitly when this function is called
175 /// from a future driven by a tokio runtime, otherwise runtime can be set
176 /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
177 pub fn new(io: E) -> io::Result<Self> {
178 let registration = Registration::new(&io)?;
179 Ok(Self {
180 io: Some(io),
181 inner: Inner {
182 registration,
183 read_readiness: AtomicUsize::new(0),
184 write_readiness: AtomicUsize::new(0),
185 },
186 })
187 }
188
189 /// Returns a shared reference to the underlying I/O object this readiness
190 /// stream is wrapping.
191 pub fn get_ref(&self) -> &E {
192 self.io.as_ref().unwrap()
193 }
194
195 /// Returns a mutable reference to the underlying I/O object this readiness
196 /// stream is wrapping.
197 pub fn get_mut(&mut self) -> &mut E {
198 self.io.as_mut().unwrap()
199 }
200
201 /// Consumes self, returning the inner I/O object
202 ///
203 /// This function will deregister the I/O resource from the reactor before
204 /// returning. If the deregistration operation fails, an error is returned.
205 ///
206 /// Note that deregistering does not guarantee that the I/O resource can be
207 /// registered with a different reactor. Some I/O resource types can only be
208 /// associated with a single reactor instance for their lifetime.
209 pub fn into_inner(mut self) -> io::Result<E> {
210 let io = self.io.take().unwrap();
211 self.inner.registration.deregister(&io)?;
212 Ok(io)
213 }
214
215 /// Check the I/O resource's read readiness state.
216 ///
217 /// The mask argument allows specifying what readiness to notify on. This
218 /// can be any value, including platform specific readiness, **except**
219 /// `writable`. HUP is always implicitly included on platforms that support
220 /// it.
221 ///
222 /// If the resource is not ready for a read then `Poll::Pending` is returned
223 /// and the current task is notified once a new event is received.
224 ///
225 /// The I/O resource will remain in a read-ready state until readiness is
226 /// cleared by calling [`clear_read_ready`].
227 ///
228 /// [`clear_read_ready`]: #method.clear_read_ready
229 ///
230 /// # Panics
231 ///
232 /// This function panics if:
233 ///
234 /// * `ready` includes writable.
235 /// * called from outside of a task context.
236 pub fn poll_read_ready(
237 &self,
238 cx: &mut Context<'_>,
239 mask: mio::Ready,
240 ) -> Poll<io::Result<mio::Ready>> {
241 assert!(!mask.is_writable(), "cannot poll for write readiness");
242 poll_ready!(
243 self,
244 mask,
245 read_readiness,
246 take_read_ready,
247 self.inner.registration.poll_read_ready(cx)
248 )
249 }
250
251 /// Clears the I/O resource's read readiness state and registers the current
252 /// task to be notified once a read readiness event is received.
253 ///
254 /// After calling this function, `poll_read_ready` will return
255 /// `Poll::Pending` until a new read readiness event has been received.
256 ///
257 /// The `mask` argument specifies the readiness bits to clear. This may not
258 /// include `writable` or `hup`.
259 ///
260 /// # Panics
261 ///
262 /// This function panics if:
263 ///
264 /// * `ready` includes writable or HUP
265 /// * called from outside of a task context.
266 pub fn clear_read_ready(&self, cx: &mut Context<'_>, ready: mio::Ready) -> io::Result<()> {
267 // Cannot clear write readiness
268 assert!(!ready.is_writable(), "cannot clear write readiness");
269 assert!(!platform::is_hup(ready), "cannot clear HUP readiness");
270
271 self.inner
272 .read_readiness
273 .fetch_and(!ready.as_usize(), Relaxed);
274
275 if self.poll_read_ready(cx, ready)?.is_ready() {
276 // Notify the current task
277 cx.waker().wake_by_ref();
278 }
279
280 Ok(())
281 }
282
283 /// Check the I/O resource's write readiness state.
284 ///
285 /// This always checks for writable readiness and also checks for HUP
286 /// readiness on platforms that support it.
287 ///
288 /// If the resource is not ready for a write then `Async::NotReady` is
289 /// returned and the current task is notified once a new event is received.
290 ///
291 /// The I/O resource will remain in a write-ready state until readiness is
292 /// cleared by calling [`clear_write_ready`].
293 ///
294 /// [`clear_write_ready`]: #method.clear_write_ready
295 ///
296 /// # Panics
297 ///
298 /// This function panics if:
299 ///
300 /// * `ready` contains bits besides `writable` and `hup`.
301 /// * called from outside of a task context.
302 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
303 poll_ready!(
304 self,
305 mio::Ready::writable(),
306 write_readiness,
307 take_write_ready,
308 self.inner.registration.poll_write_ready(cx)
309 )
310 }
311
312 /// Resets the I/O resource's write readiness state and registers the current
313 /// task to be notified once a write readiness event is received.
314 ///
315 /// This only clears writable readiness. HUP (on platforms that support HUP)
316 /// cannot be cleared as it is a final state.
317 ///
318 /// After calling this function, `poll_write_ready(Ready::writable())` will
319 /// return `NotReady` until a new write readiness event has been received.
320 ///
321 /// # Panics
322 ///
323 /// This function will panic if called from outside of a task context.
324 pub fn clear_write_ready(&self, cx: &mut Context<'_>) -> io::Result<()> {
325 let ready = mio::Ready::writable();
326
327 self.inner
328 .write_readiness
329 .fetch_and(!ready.as_usize(), Relaxed);
330
331 if self.poll_write_ready(cx)?.is_ready() {
332 // Notify the current task
333 cx.waker().wake_by_ref();
334 }
335
336 Ok(())
337 }
338}
339
340// ===== Read / Write impls =====
341
342impl<E> AsyncRead for PollEvented<E>
343where
344 E: Evented + Read + Unpin,
345{
346 fn poll_read(
347 mut self: Pin<&mut Self>,
348 cx: &mut Context<'_>,
349 buf: &mut [u8],
350 ) -> Poll<io::Result<usize>> {
351 ready!(self.poll_read_ready(cx, mio::Ready::readable()))?;
352
353 let r = (*self).get_mut().read(buf);
354
355 if is_wouldblock(&r) {
356 self.clear_read_ready(cx, mio::Ready::readable())?;
357 return Poll::Pending;
358 }
359
360 Poll::Ready(r)
361 }
362}
363
364impl<E> AsyncWrite for PollEvented<E>
365where
366 E: Evented + Write + Unpin,
367{
368 fn poll_write(
369 mut self: Pin<&mut Self>,
370 cx: &mut Context<'_>,
371 buf: &[u8],
372 ) -> Poll<io::Result<usize>> {
373 ready!(self.poll_write_ready(cx))?;
374
375 let r = (*self).get_mut().write(buf);
376
377 if is_wouldblock(&r) {
378 self.clear_write_ready(cx)?;
379 return Poll::Pending;
380 }
381
382 Poll::Ready(r)
383 }
384
385 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
386 ready!(self.poll_write_ready(cx))?;
387
388 let r = (*self).get_mut().flush();
389
390 if is_wouldblock(&r) {
391 self.clear_write_ready(cx)?;
392 return Poll::Pending;
393 }
394
395 Poll::Ready(r)
396 }
397
398 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
399 Poll::Ready(Ok(()))
400 }
401}
402
403fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
404 match *r {
405 Ok(_) => false,
406 Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
407 }
408}
409
410impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
411 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
412 f.debug_struct("PollEvented").field("io", &self.io).finish()
413 }
414}
415
416impl<E: Evented> Drop for PollEvented<E> {
417 fn drop(&mut self) {
418 if let Some(io) = self.io.take() {
419 // Ignore errors
420 let _ = self.inner.registration.deregister(&io);
421 }
422 }
423}