broker_tokio/io/
poll_evented.rs

1use crate::io::driver::platform;
2use crate::io::{AsyncRead, AsyncWrite, Registration};
3
4use mio::event::Evented;
5use std::fmt;
6use std::io::{self, Read, Write};
7use std::marker::Unpin;
8use std::pin::Pin;
9use std::sync::atomic::AtomicUsize;
10use std::sync::atomic::Ordering::Relaxed;
11use std::task::{Context, Poll};
12
13cfg_io_driver! {
14    /// Associates an I/O resource that implements the [`std::io::Read`] and/or
15    /// [`std::io::Write`] traits with the reactor that drives it.
16    ///
17    /// `PollEvented` uses [`Registration`] internally to take a type that
18    /// implements [`mio::Evented`] as well as [`std::io::Read`] and or
19    /// [`std::io::Write`] and associate it with a reactor that will drive it.
20    ///
21    /// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be
22    /// used from within the future's execution model. As such, the
23    /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
24    /// implementations using the underlying I/O resource as well as readiness
25    /// events provided by the reactor.
26    ///
27    /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
28    /// `Sync`), the caller must ensure that there are at most two tasks that
29    /// use a `PollEvented` instance concurrently. One for reading and one for
30    /// writing. While violating this requirement is "safe" from a Rust memory
31    /// model point of view, it will result in unexpected behavior in the form
32    /// of lost notifications and tasks hanging.
33    ///
34    /// ## Readiness events
35    ///
36    /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
37    /// this type also supports access to the underlying readiness event stream.
38    /// While similar in function to what [`Registration`] provides, the
39    /// semantics are a bit different.
40    ///
41    /// Two functions are provided to access the readiness events:
42    /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
43    /// current readiness state of the `PollEvented` instance. If
44    /// [`poll_read_ready`] indicates read readiness, immediately calling
45    /// [`poll_read_ready`] again will also indicate read readiness.
46    ///
47    /// When the operation is attempted and is unable to succeed due to the I/O
48    /// resource not being ready, the caller must call [`clear_read_ready`] or
49    /// [`clear_write_ready`]. This clears the readiness state until a new
50    /// readiness event is received.
51    ///
52    /// This allows the caller to implement additional functions. For example,
53    /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
54    /// [`clear_read_ready`].
55    ///
56    /// ```rust
57    /// use tokio::io::PollEvented;
58    ///
59    /// use futures::ready;
60    /// use mio::Ready;
61    /// use mio::net::{TcpStream, TcpListener};
62    /// use std::io;
63    /// use std::task::{Context, Poll};
64    ///
65    /// struct MyListener {
66    ///     poll_evented: PollEvented<TcpListener>,
67    /// }
68    ///
69    /// impl MyListener {
70    ///     pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<TcpStream, io::Error>> {
71    ///         let ready = Ready::readable();
72    ///
73    ///         ready!(self.poll_evented.poll_read_ready(cx, ready))?;
74    ///
75    ///         match self.poll_evented.get_ref().accept() {
76    ///             Ok((socket, _)) => Poll::Ready(Ok(socket)),
77    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
78    ///                 self.poll_evented.clear_read_ready(cx, ready)?;
79    ///                 Poll::Pending
80    ///             }
81    ///             Err(e) => Poll::Ready(Err(e)),
82    ///         }
83    ///     }
84    /// }
85    /// ```
86    ///
87    /// ## Platform-specific events
88    ///
89    /// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
90    /// These events are included as part of the read readiness event stream. The
91    /// write readiness event stream is only for `Ready::writable()` events.
92    ///
93    /// [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
94    /// [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
95    /// [`AsyncRead`]: ../io/trait.AsyncRead.html
96    /// [`AsyncWrite`]: ../io/trait.AsyncWrite.html
97    /// [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html
98    /// [`Registration`]: struct.Registration.html
99    /// [`TcpListener`]: ../net/struct.TcpListener.html
100    /// [`clear_read_ready`]: #method.clear_read_ready
101    /// [`clear_write_ready`]: #method.clear_write_ready
102    /// [`poll_read_ready`]: #method.poll_read_ready
103    /// [`poll_write_ready`]: #method.poll_write_ready
104    pub struct PollEvented<E: Evented> {
105        io: Option<E>,
106        inner: Inner,
107    }
108}
109
110struct Inner {
111    registration: Registration,
112
113    /// Currently visible read readiness
114    read_readiness: AtomicUsize,
115
116    /// Currently visible write readiness
117    write_readiness: AtomicUsize,
118}
119
120// ===== impl PollEvented =====
121
122macro_rules! poll_ready {
123    ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{
124        // Load cached & encoded readiness.
125        let mut cached = $me.inner.$cache.load(Relaxed);
126        let mask = $mask | platform::hup();
127
128        // See if the current readiness matches any bits.
129        let mut ret = mio::Ready::from_usize(cached) & $mask;
130
131        if ret.is_empty() {
132            // Readiness does not match, consume the registration's readiness
133            // stream. This happens in a loop to ensure that the stream gets
134            // drained.
135            loop {
136                let ready = match $poll? {
137                    Poll::Ready(v) => v,
138                    Poll::Pending => return Poll::Pending,
139                };
140                cached |= ready.as_usize();
141
142                // Update the cache store
143                $me.inner.$cache.store(cached, Relaxed);
144
145                ret |= ready & mask;
146
147                if !ret.is_empty() {
148                    return Poll::Ready(Ok(ret));
149                }
150            }
151        } else {
152            // Check what's new with the registration stream. This will not
153            // request to be notified
154            if let Some(ready) = $me.inner.registration.$take()? {
155                cached |= ready.as_usize();
156                $me.inner.$cache.store(cached, Relaxed);
157            }
158
159            Poll::Ready(Ok(mio::Ready::from_usize(cached)))
160        }
161    }};
162}
163
164impl<E> PollEvented<E>
165where
166    E: Evented,
167{
168    /// Creates a new `PollEvented` associated with the default reactor.
169    ///
170    /// # Panics
171    ///
172    /// This function panics if thread-local runtime is not set.
173    ///
174    /// The runtime is usually set implicitly when this function is called
175    /// from a future driven by a tokio runtime, otherwise runtime can be set
176    /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
177    pub fn new(io: E) -> io::Result<Self> {
178        let registration = Registration::new(&io)?;
179        Ok(Self {
180            io: Some(io),
181            inner: Inner {
182                registration,
183                read_readiness: AtomicUsize::new(0),
184                write_readiness: AtomicUsize::new(0),
185            },
186        })
187    }
188
189    /// Returns a shared reference to the underlying I/O object this readiness
190    /// stream is wrapping.
191    pub fn get_ref(&self) -> &E {
192        self.io.as_ref().unwrap()
193    }
194
195    /// Returns a mutable reference to the underlying I/O object this readiness
196    /// stream is wrapping.
197    pub fn get_mut(&mut self) -> &mut E {
198        self.io.as_mut().unwrap()
199    }
200
201    /// Consumes self, returning the inner I/O object
202    ///
203    /// This function will deregister the I/O resource from the reactor before
204    /// returning. If the deregistration operation fails, an error is returned.
205    ///
206    /// Note that deregistering does not guarantee that the I/O resource can be
207    /// registered with a different reactor. Some I/O resource types can only be
208    /// associated with a single reactor instance for their lifetime.
209    pub fn into_inner(mut self) -> io::Result<E> {
210        let io = self.io.take().unwrap();
211        self.inner.registration.deregister(&io)?;
212        Ok(io)
213    }
214
215    /// Check the I/O resource's read readiness state.
216    ///
217    /// The mask argument allows specifying what readiness to notify on. This
218    /// can be any value, including platform specific readiness, **except**
219    /// `writable`. HUP is always implicitly included on platforms that support
220    /// it.
221    ///
222    /// If the resource is not ready for a read then `Poll::Pending` is returned
223    /// and the current task is notified once a new event is received.
224    ///
225    /// The I/O resource will remain in a read-ready state until readiness is
226    /// cleared by calling [`clear_read_ready`].
227    ///
228    /// [`clear_read_ready`]: #method.clear_read_ready
229    ///
230    /// # Panics
231    ///
232    /// This function panics if:
233    ///
234    /// * `ready` includes writable.
235    /// * called from outside of a task context.
236    pub fn poll_read_ready(
237        &self,
238        cx: &mut Context<'_>,
239        mask: mio::Ready,
240    ) -> Poll<io::Result<mio::Ready>> {
241        assert!(!mask.is_writable(), "cannot poll for write readiness");
242        poll_ready!(
243            self,
244            mask,
245            read_readiness,
246            take_read_ready,
247            self.inner.registration.poll_read_ready(cx)
248        )
249    }
250
251    /// Clears the I/O resource's read readiness state and registers the current
252    /// task to be notified once a read readiness event is received.
253    ///
254    /// After calling this function, `poll_read_ready` will return
255    /// `Poll::Pending` until a new read readiness event has been received.
256    ///
257    /// The `mask` argument specifies the readiness bits to clear. This may not
258    /// include `writable` or `hup`.
259    ///
260    /// # Panics
261    ///
262    /// This function panics if:
263    ///
264    /// * `ready` includes writable or HUP
265    /// * called from outside of a task context.
266    pub fn clear_read_ready(&self, cx: &mut Context<'_>, ready: mio::Ready) -> io::Result<()> {
267        // Cannot clear write readiness
268        assert!(!ready.is_writable(), "cannot clear write readiness");
269        assert!(!platform::is_hup(ready), "cannot clear HUP readiness");
270
271        self.inner
272            .read_readiness
273            .fetch_and(!ready.as_usize(), Relaxed);
274
275        if self.poll_read_ready(cx, ready)?.is_ready() {
276            // Notify the current task
277            cx.waker().wake_by_ref();
278        }
279
280        Ok(())
281    }
282
283    /// Check the I/O resource's write readiness state.
284    ///
285    /// This always checks for writable readiness and also checks for HUP
286    /// readiness on platforms that support it.
287    ///
288    /// If the resource is not ready for a write then `Async::NotReady` is
289    /// returned and the current task is notified once a new event is received.
290    ///
291    /// The I/O resource will remain in a write-ready state until readiness is
292    /// cleared by calling [`clear_write_ready`].
293    ///
294    /// [`clear_write_ready`]: #method.clear_write_ready
295    ///
296    /// # Panics
297    ///
298    /// This function panics if:
299    ///
300    /// * `ready` contains bits besides `writable` and `hup`.
301    /// * called from outside of a task context.
302    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
303        poll_ready!(
304            self,
305            mio::Ready::writable(),
306            write_readiness,
307            take_write_ready,
308            self.inner.registration.poll_write_ready(cx)
309        )
310    }
311
312    /// Resets the I/O resource's write readiness state and registers the current
313    /// task to be notified once a write readiness event is received.
314    ///
315    /// This only clears writable readiness. HUP (on platforms that support HUP)
316    /// cannot be cleared as it is a final state.
317    ///
318    /// After calling this function, `poll_write_ready(Ready::writable())` will
319    /// return `NotReady` until a new write readiness event has been received.
320    ///
321    /// # Panics
322    ///
323    /// This function will panic if called from outside of a task context.
324    pub fn clear_write_ready(&self, cx: &mut Context<'_>) -> io::Result<()> {
325        let ready = mio::Ready::writable();
326
327        self.inner
328            .write_readiness
329            .fetch_and(!ready.as_usize(), Relaxed);
330
331        if self.poll_write_ready(cx)?.is_ready() {
332            // Notify the current task
333            cx.waker().wake_by_ref();
334        }
335
336        Ok(())
337    }
338}
339
340// ===== Read / Write impls =====
341
342impl<E> AsyncRead for PollEvented<E>
343where
344    E: Evented + Read + Unpin,
345{
346    fn poll_read(
347        mut self: Pin<&mut Self>,
348        cx: &mut Context<'_>,
349        buf: &mut [u8],
350    ) -> Poll<io::Result<usize>> {
351        ready!(self.poll_read_ready(cx, mio::Ready::readable()))?;
352
353        let r = (*self).get_mut().read(buf);
354
355        if is_wouldblock(&r) {
356            self.clear_read_ready(cx, mio::Ready::readable())?;
357            return Poll::Pending;
358        }
359
360        Poll::Ready(r)
361    }
362}
363
364impl<E> AsyncWrite for PollEvented<E>
365where
366    E: Evented + Write + Unpin,
367{
368    fn poll_write(
369        mut self: Pin<&mut Self>,
370        cx: &mut Context<'_>,
371        buf: &[u8],
372    ) -> Poll<io::Result<usize>> {
373        ready!(self.poll_write_ready(cx))?;
374
375        let r = (*self).get_mut().write(buf);
376
377        if is_wouldblock(&r) {
378            self.clear_write_ready(cx)?;
379            return Poll::Pending;
380        }
381
382        Poll::Ready(r)
383    }
384
385    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
386        ready!(self.poll_write_ready(cx))?;
387
388        let r = (*self).get_mut().flush();
389
390        if is_wouldblock(&r) {
391            self.clear_write_ready(cx)?;
392            return Poll::Pending;
393        }
394
395        Poll::Ready(r)
396    }
397
398    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
399        Poll::Ready(Ok(()))
400    }
401}
402
403fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
404    match *r {
405        Ok(_) => false,
406        Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
407    }
408}
409
410impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
411    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
412        f.debug_struct("PollEvented").field("io", &self.io).finish()
413    }
414}
415
416impl<E: Evented> Drop for PollEvented<E> {
417    fn drop(&mut self) {
418        if let Some(io) = self.io.take() {
419            // Ignore errors
420            let _ = self.inner.registration.deregister(&io);
421        }
422    }
423}