tokio_reactor/
poll_evented.rs

1use {Handle, Registration};
2
3use futures::{task, Async, Poll};
4use mio;
5use mio::event::Evented;
6use tokio_io::{AsyncRead, AsyncWrite};
7
8use std::fmt;
9use std::io::{self, Read, Write};
10use std::sync::atomic::AtomicUsize;
11use std::sync::atomic::Ordering::Relaxed;
12
13/// Associates an I/O resource that implements the [`std::io::Read`] and/or
14/// [`std::io::Write`] traits with the reactor that drives it.
15///
16/// `PollEvented` uses [`Registration`] internally to take a type that
17/// implements [`mio::Evented`] as well as [`std::io::Read`] and or
18/// [`std::io::Write`] and associate it with a reactor that will drive it.
19///
20/// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be
21/// used from within the future's execution model. As such, the `PollEvented`
22/// type provides [`AsyncRead`] and [`AsyncWrite`] implementations using the
23/// underlying I/O resource as well as readiness events provided by the reactor.
24///
25/// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
26/// `Sync`), the caller must ensure that there are at most two tasks that use a
27/// `PollEvented` instance concurrently. One for reading and one for writing.
28/// While violating this requirement is "safe" from a Rust memory model point of
29/// view, it will result in unexpected behavior in the form of lost
30/// notifications and tasks hanging.
31///
32/// ## Readiness events
33///
34/// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
35/// this type also supports access to the underlying readiness event stream.
36/// While similar in function to what [`Registration`] provides, the semantics
37/// are a bit different.
38///
39/// Two functions are provided to access the readiness events:
40/// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
41/// current readiness state of the `PollEvented` instance. If
42/// [`poll_read_ready`] indicates read readiness, immediately calling
43/// [`poll_read_ready`] again will also indicate read readiness.
44///
45/// When the operation is attempted and is unable to succeed due to the I/O
46/// resource not being ready, the caller must call [`clear_read_ready`] or
47/// [`clear_write_ready`]. This clears the readiness state until a new readiness
48/// event is received.
49///
50/// This allows the caller to implement additional functions. For example,
51/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
52/// [`clear_read_ready`].
53///
54/// ```rust,ignore
55/// pub fn poll_accept(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> {
56///     let ready = Ready::readable();
57///
58///     try_ready!(self.poll_evented.poll_read_ready(ready));
59///
60///     match self.poll_evented.get_ref().accept_std() {
61///         Ok(pair) => Ok(Async::Ready(pair)),
62///         Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
63///             self.poll_evented.clear_read_ready(ready);
64///             Ok(Async::NotReady)
65///         }
66///         Err(e) => Err(e),
67///     }
68/// }
69/// ```
70///
71/// ## Platform-specific events
72///
73/// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
74/// These events are included as part of the read readiness event stream. The
75/// write readiness event stream is only for `Ready::writable()` events.
76///
77/// [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
78/// [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
79/// [`AsyncRead`]: ../io/trait.AsyncRead.html
80/// [`AsyncWrite`]: ../io/trait.AsyncWrite.html
81/// [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html
82/// [`Registration`]: struct.Registration.html
83/// [`TcpListener`]: ../net/struct.TcpListener.html
84/// [`clear_read_ready`]: #method.clear_read_ready
85/// [`clear_write_ready`]: #method.clear_write_ready
86/// [`poll_read_ready`]: #method.poll_read_ready
87/// [`poll_write_ready`]: #method.poll_write_ready
88pub struct PollEvented<E: Evented> {
89    io: Option<E>,
90    inner: Inner,
91}
92
93struct Inner {
94    registration: Registration,
95
96    /// Currently visible read readiness
97    read_readiness: AtomicUsize,
98
99    /// Currently visible write readiness
100    write_readiness: AtomicUsize,
101}
102
103// ===== impl PollEvented =====
104
105macro_rules! poll_ready {
106    ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{
107        $me.register()?;
108
109        // Load cached & encoded readiness.
110        let mut cached = $me.inner.$cache.load(Relaxed);
111        let mask = $mask | ::platform::hup();
112
113        // See if the current readiness matches any bits.
114        let mut ret = mio::Ready::from_usize(cached) & $mask;
115
116        if ret.is_empty() {
117            // Readiness does not match, consume the registration's readiness
118            // stream. This happens in a loop to ensure that the stream gets
119            // drained.
120            loop {
121                let ready = try_ready!($poll);
122                cached |= ready.as_usize();
123
124                // Update the cache store
125                $me.inner.$cache.store(cached, Relaxed);
126
127                ret |= ready & mask;
128
129                if !ret.is_empty() {
130                    return Ok(ret.into());
131                }
132            }
133        } else {
134            // Check what's new with the registration stream. This will not
135            // request to be notified
136            if let Some(ready) = $me.inner.registration.$take()? {
137                cached |= ready.as_usize();
138                $me.inner.$cache.store(cached, Relaxed);
139            }
140
141            Ok(mio::Ready::from_usize(cached).into())
142        }
143    }};
144}
145
146impl<E> PollEvented<E>
147where
148    E: Evented,
149{
150    /// Creates a new `PollEvented` associated with the default reactor.
151    pub fn new(io: E) -> PollEvented<E> {
152        PollEvented {
153            io: Some(io),
154            inner: Inner {
155                registration: Registration::new(),
156                read_readiness: AtomicUsize::new(0),
157                write_readiness: AtomicUsize::new(0),
158            },
159        }
160    }
161
162    /// Creates a new `PollEvented` associated with the specified reactor.
163    pub fn new_with_handle(io: E, handle: &Handle) -> io::Result<Self> {
164        let ret = PollEvented::new(io);
165
166        if let Some(handle) = handle.as_priv() {
167            ret.inner
168                .registration
169                .register_with_priv(ret.io.as_ref().unwrap(), handle)?;
170        }
171
172        Ok(ret)
173    }
174
175    /// Returns a shared reference to the underlying I/O object this readiness
176    /// stream is wrapping.
177    pub fn get_ref(&self) -> &E {
178        self.io.as_ref().unwrap()
179    }
180
181    /// Returns a mutable reference to the underlying I/O object this readiness
182    /// stream is wrapping.
183    pub fn get_mut(&mut self) -> &mut E {
184        self.io.as_mut().unwrap()
185    }
186
187    /// Consumes self, returning the inner I/O object
188    ///
189    /// This function will deregister the I/O resource from the reactor before
190    /// returning. If the deregistration operation fails, an error is returned.
191    ///
192    /// Note that deregistering does not guarantee that the I/O resource can be
193    /// registered with a different reactor. Some I/O resource types can only be
194    /// associated with a single reactor instance for their lifetime.
195    pub fn into_inner(mut self) -> io::Result<E> {
196        let io = self.io.take().unwrap();
197        self.inner.registration.deregister(&io)?;
198        Ok(io)
199    }
200
201    /// Check the I/O resource's read readiness state.
202    ///
203    /// The mask argument allows specifying what readiness to notify on. This
204    /// can be any value, including platform specific readiness, **except**
205    /// `writable`. HUP is always implicitly included on platforms that support
206    /// it.
207    ///
208    /// If the resource is not ready for a read then `Async::NotReady` is
209    /// returned and the current task is notified once a new event is received.
210    ///
211    /// The I/O resource will remain in a read-ready state until readiness is
212    /// cleared by calling [`clear_read_ready`].
213    ///
214    /// [`clear_read_ready`]: #method.clear_read_ready
215    ///
216    /// # Panics
217    ///
218    /// This function panics if:
219    ///
220    /// * `ready` includes writable.
221    /// * called from outside of a task context.
222    pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
223        assert!(!mask.is_writable(), "cannot poll for write readiness");
224        poll_ready!(
225            self,
226            mask,
227            read_readiness,
228            take_read_ready,
229            self.inner.registration.poll_read_ready()
230        )
231    }
232
233    /// Clears the I/O resource's read readiness state and registers the current
234    /// task to be notified once a read readiness event is received.
235    ///
236    /// After calling this function, `poll_read_ready` will return `NotReady`
237    /// until a new read readiness event has been received.
238    ///
239    /// The `mask` argument specifies the readiness bits to clear. This may not
240    /// include `writable` or `hup`.
241    ///
242    /// # Panics
243    ///
244    /// This function panics if:
245    ///
246    /// * `ready` includes writable or HUP
247    /// * called from outside of a task context.
248    pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> {
249        // Cannot clear write readiness
250        assert!(!ready.is_writable(), "cannot clear write readiness");
251        assert!(!::platform::is_hup(&ready), "cannot clear HUP readiness");
252
253        self.inner
254            .read_readiness
255            .fetch_and(!ready.as_usize(), Relaxed);
256
257        if self.poll_read_ready(ready)?.is_ready() {
258            // Notify the current task
259            task::current().notify();
260        }
261
262        Ok(())
263    }
264
265    /// Check the I/O resource's write readiness state.
266    ///
267    /// This always checks for writable readiness and also checks for HUP
268    /// readiness on platforms that support it.
269    ///
270    /// If the resource is not ready for a write then `Async::NotReady` is
271    /// returned and the current task is notified once a new event is received.
272    ///
273    /// The I/O resource will remain in a write-ready state until readiness is
274    /// cleared by calling [`clear_write_ready`].
275    ///
276    /// [`clear_write_ready`]: #method.clear_write_ready
277    ///
278    /// # Panics
279    ///
280    /// This function panics if:
281    ///
282    /// * `ready` contains bits besides `writable` and `hup`.
283    /// * called from outside of a task context.
284    pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
285        poll_ready!(
286            self,
287            mio::Ready::writable(),
288            write_readiness,
289            take_write_ready,
290            self.inner.registration.poll_write_ready()
291        )
292    }
293
294    /// Resets the I/O resource's write readiness state and registers the current
295    /// task to be notified once a write readiness event is received.
296    ///
297    /// This only clears writable readiness. HUP (on platforms that support HUP)
298    /// cannot be cleared as it is a final state.
299    ///
300    /// After calling this function, `poll_write_ready(Ready::writable())` will
301    /// return `NotReady` until a new write readiness event has been received.
302    ///
303    /// # Panics
304    ///
305    /// This function will panic if called from outside of a task context.
306    pub fn clear_write_ready(&self) -> io::Result<()> {
307        let ready = mio::Ready::writable();
308
309        self.inner
310            .write_readiness
311            .fetch_and(!ready.as_usize(), Relaxed);
312
313        if self.poll_write_ready()?.is_ready() {
314            // Notify the current task
315            task::current().notify();
316        }
317
318        Ok(())
319    }
320
321    /// Ensure that the I/O resource is registered with the reactor.
322    fn register(&self) -> io::Result<()> {
323        self.inner
324            .registration
325            .register(self.io.as_ref().unwrap())?;
326        Ok(())
327    }
328}
329
330// ===== Read / Write impls =====
331
332impl<E> Read for PollEvented<E>
333where
334    E: Evented + Read,
335{
336    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
337        if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? {
338            return Err(io::ErrorKind::WouldBlock.into());
339        }
340
341        let r = self.get_mut().read(buf);
342
343        if is_wouldblock(&r) {
344            self.clear_read_ready(mio::Ready::readable())?;
345        }
346
347        return r;
348    }
349}
350
351impl<E> Write for PollEvented<E>
352where
353    E: Evented + Write,
354{
355    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
356        if let Async::NotReady = self.poll_write_ready()? {
357            return Err(io::ErrorKind::WouldBlock.into());
358        }
359
360        let r = self.get_mut().write(buf);
361
362        if is_wouldblock(&r) {
363            self.clear_write_ready()?;
364        }
365
366        return r;
367    }
368
369    fn flush(&mut self) -> io::Result<()> {
370        if let Async::NotReady = self.poll_write_ready()? {
371            return Err(io::ErrorKind::WouldBlock.into());
372        }
373
374        let r = self.get_mut().flush();
375
376        if is_wouldblock(&r) {
377            self.clear_write_ready()?;
378        }
379
380        return r;
381    }
382}
383
384impl<E> AsyncRead for PollEvented<E> where E: Evented + Read {}
385
386impl<E> AsyncWrite for PollEvented<E>
387where
388    E: Evented + Write,
389{
390    fn shutdown(&mut self) -> Poll<(), io::Error> {
391        Ok(().into())
392    }
393}
394
395// ===== &'a Read / &'a Write impls =====
396
397impl<'a, E> Read for &'a PollEvented<E>
398where
399    E: Evented,
400    &'a E: Read,
401{
402    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
403        if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? {
404            return Err(io::ErrorKind::WouldBlock.into());
405        }
406
407        let r = self.get_ref().read(buf);
408
409        if is_wouldblock(&r) {
410            self.clear_read_ready(mio::Ready::readable())?;
411        }
412
413        return r;
414    }
415}
416
417impl<'a, E> Write for &'a PollEvented<E>
418where
419    E: Evented,
420    &'a E: Write,
421{
422    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
423        if let Async::NotReady = self.poll_write_ready()? {
424            return Err(io::ErrorKind::WouldBlock.into());
425        }
426
427        let r = self.get_ref().write(buf);
428
429        if is_wouldblock(&r) {
430            self.clear_write_ready()?;
431        }
432
433        return r;
434    }
435
436    fn flush(&mut self) -> io::Result<()> {
437        if let Async::NotReady = self.poll_write_ready()? {
438            return Err(io::ErrorKind::WouldBlock.into());
439        }
440
441        let r = self.get_ref().flush();
442
443        if is_wouldblock(&r) {
444            self.clear_write_ready()?;
445        }
446
447        return r;
448    }
449}
450
451impl<'a, E> AsyncRead for &'a PollEvented<E>
452where
453    E: Evented,
454    &'a E: Read,
455{
456}
457
458impl<'a, E> AsyncWrite for &'a PollEvented<E>
459where
460    E: Evented,
461    &'a E: Write,
462{
463    fn shutdown(&mut self) -> Poll<(), io::Error> {
464        Ok(().into())
465    }
466}
467
468fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
469    match *r {
470        Ok(_) => false,
471        Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
472    }
473}
474
475impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
476    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
477        f.debug_struct("PollEvented").field("io", &self.io).finish()
478    }
479}
480
481impl<E: Evented> Drop for PollEvented<E> {
482    fn drop(&mut self) {
483        if let Some(io) = self.io.take() {
484            // Ignore errors
485            let _ = self.inner.registration.deregister(&io);
486        }
487    }
488}