tokio_core/reactor/
poll_evented.rs

1//! Readiness tracking streams, backing I/O objects.
2//!
3//! This module contains the core type which is used to back all I/O on object
4//! in `tokio-core`. The `PollEvented` type is the implementation detail of
5//! all I/O. Each `PollEvented` manages registration with a reactor,
6//! acquisition of a token, and tracking of the readiness state on the
7//! underlying I/O primitive.
8
9use std::fmt;
10use std::io::{self, Read, Write};
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering::Relaxed;
13
14use futures::{task, Async, Poll};
15use mio::event::Evented;
16use mio::Ready;
17use tokio_io::{AsyncRead, AsyncWrite};
18use tokio::reactor::{Registration};
19
20use reactor::{Handle, Remote};
21
22/// A concrete implementation of a stream of readiness notifications for I/O
23/// objects that originates from an event loop.
24///
25/// Created by the `PollEvented::new` method, each `PollEvented` is
26/// associated with a specific event loop and source of events that will be
27/// registered with an event loop.
28///
29/// An instance of `PollEvented` is essentially the bridge between the `mio`
30/// world and the `tokio-core` world, providing abstractions to receive
31/// notifications about changes to an object's `mio::Ready` state.
32///
33/// Each readiness stream has a number of methods to test whether the underlying
34/// object is readable or writable. Once the methods return that an object is
35/// readable/writable, then it will continue to do so until the `need_read` or
36/// `need_write` methods are called.
37///
38/// That is, this object is typically wrapped in another form of I/O object.
39/// It's the responsibility of the wrapper to inform the readiness stream when a
40/// "would block" I/O event is seen. The readiness stream will then take care of
41/// any scheduling necessary to get notified when the event is ready again.
42///
43/// You can find more information about creating a custom I/O object [online].
44///
45/// [online]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/#custom-io
46///
47/// ## Readiness to read/write
48///
49/// A `PollEvented` allows listening and waiting for an arbitrary `mio::Ready`
50/// instance, including the platform-specific contents of `mio::Ready`. At most
51/// two future tasks, however, can be waiting on a `PollEvented`. The
52/// `need_read` and `need_write` methods can block two separate tasks, one on
53/// reading and one on writing. Not all I/O events correspond to read/write,
54/// however!
55///
56/// To account for this a `PollEvented` gets a little interesting when working
57/// with an arbitrary instance of `mio::Ready` that may not map precisely to
58/// "write" and "read" tasks. Currently it is defined that instances of
59/// `mio::Ready` that do *not* return true from `is_writable` are all notified
60/// through `need_read`, or the read task.
61///
62/// In other words, `poll_ready` with the `mio::UnixReady::hup` event will block
63/// the read task of this `PollEvented` if the `hup` event isn't available.
64/// Essentially a good rule of thumb is that if you're using the `poll_ready`
65/// method you want to also use `need_read` to signal blocking and you should
66/// otherwise probably avoid using two tasks on the same `PollEvented`.
67pub struct PollEvented<E> {
68    io: E,
69    inner: Inner,
70    remote: Remote,
71}
72
73struct Inner {
74    registration: Registration,
75
76    /// Currently visible read readiness
77    read_readiness: AtomicUsize,
78
79    /// Currently visible write readiness
80    write_readiness: AtomicUsize,
81}
82
83impl<E: Evented> PollEvented<E> {
84    /// Creates a new readiness stream associated with the provided
85    /// `loop_handle` and for the given `source`.
86    ///
87    /// This method returns a future which will resolve to the readiness stream
88    /// when it's ready.
89    pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>> {
90        let registration = Registration::new();
91        registration.register_with(&io, handle.new_tokio_handle())?;
92
93        Ok(PollEvented {
94            io: io,
95            inner: Inner {
96                registration,
97                read_readiness: AtomicUsize::new(0),
98                write_readiness: AtomicUsize::new(0),
99            },
100            remote: handle.remote().clone(),
101        })
102    }
103
104    /// Deregisters this source of events from the reactor core specified.
105    ///
106    /// This method can optionally be called to unregister the underlying I/O
107    /// object with the event loop that the `handle` provided points to.
108    /// Typically this method is not required as this automatically happens when
109    /// `E` is dropped, but for some use cases the `E` object doesn't represent
110    /// an owned reference, so dropping it won't automatically unregister with
111    /// the event loop.
112    ///
113    /// This consumes `self` as it will no longer provide events after the
114    /// method is called, and will likely return an error if this `PollEvented`
115    /// was created on a separate event loop from the `handle` specified.
116    pub fn deregister(self, _: &Handle) -> io::Result<()> {
117        // Nothing has to happen here anymore as I/O objects are explicitly
118        // deregistered before dropped.
119        Ok(())
120    }
121}
122
123impl<E> PollEvented<E> {
124    /// Tests to see if this source is ready to be read from or not.
125    ///
126    /// If this stream is not ready for a read then `NotReady` will be returned
127    /// and the current task will be scheduled to receive a notification when
128    /// the stream is readable again. In other words, this method is only safe
129    /// to call from within the context of a future's task, typically done in a
130    /// `Future::poll` method.
131    ///
132    /// This is mostly equivalent to `self.poll_ready(Ready::readable())`.
133    ///
134    /// # Panics
135    ///
136    /// This function will panic if called outside the context of a future's
137    /// task.
138    pub fn poll_read(&self) -> Async<()> {
139        if self.poll_read2().is_ready() {
140            return ().into();
141        }
142
143        Async::NotReady
144    }
145
146    fn poll_read2(&self) -> Async<Ready> {
147        // Load the cached readiness
148        match self.inner.read_readiness.load(Relaxed) {
149            0 => {}
150            mut n => {
151                // Check what's new with the reactor.
152                if let Some(ready) = self.inner.registration.take_read_ready().unwrap() {
153                    n |= super::ready2usize(ready);
154                    self.inner.read_readiness.store(n, Relaxed);
155                }
156
157                return super::usize2ready(n).into();
158            }
159        }
160
161        let ready = match self.inner.registration.poll_read_ready().unwrap() {
162            Async::Ready(r) => r,
163            _ => return Async::NotReady,
164        };
165
166        // Cache the value
167        self.inner.read_readiness.store(super::ready2usize(ready), Relaxed);
168
169        ready.into()
170    }
171
172    /// Tests to see if this source is ready to be written to or not.
173    ///
174    /// If this stream is not ready for a write then `NotReady` will be returned
175    /// and the current task will be scheduled to receive a notification when
176    /// the stream is writable again. In other words, this method is only safe
177    /// to call from within the context of a future's task, typically done in a
178    /// `Future::poll` method.
179    ///
180    /// This is mostly equivalent to `self.poll_ready(Ready::writable())`.
181    ///
182    /// # Panics
183    ///
184    /// This function will panic if called outside the context of a future's
185    /// task.
186    pub fn poll_write(&self) -> Async<()> {
187        match self.inner.write_readiness.load(Relaxed) {
188            0 => {}
189            mut n => {
190                // Check what's new with the reactor.
191                if let Some(ready) = self.inner.registration.take_write_ready().unwrap() {
192                    n |= super::ready2usize(ready);
193                    self.inner.write_readiness.store(n, Relaxed);
194                }
195
196                return ().into();
197            }
198        }
199
200        let ready = match self.inner.registration.poll_write_ready().unwrap() {
201            Async::Ready(r) => r,
202            _ => return Async::NotReady,
203        };
204
205        // Cache the value
206        self.inner.write_readiness.store(super::ready2usize(ready), Relaxed);
207
208        ().into()
209    }
210
211    /// Test to see whether this source fulfills any condition listed in `mask`
212    /// provided.
213    ///
214    /// The `mask` given here is a mio `Ready` set of possible events. This can
215    /// contain any events like read/write but also platform-specific events
216    /// such as hup and error. The `mask` indicates events that are interested
217    /// in being ready.
218    ///
219    /// If any event in `mask` is ready then it is returned through
220    /// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty
221    /// and contains all events that are currently ready in the `mask` provided.
222    ///
223    /// If no events are ready in the `mask` provided then the current task is
224    /// scheduled to receive a notification when any of them become ready. If
225    /// the `writable` event is contained within `mask` then this
226    /// `PollEvented`'s `write` task will be blocked and otherwise the `read`
227    /// task will be blocked. This is generally only relevant if you're working
228    /// with this `PollEvented` object on multiple tasks.
229    ///
230    /// # Panics
231    ///
232    /// This function will panic if called outside the context of a future's
233    /// task.
234    pub fn poll_ready(&self, mask: Ready) -> Async<Ready> {
235        let mut ret = Ready::empty();
236
237        if mask.is_empty() {
238            return ret.into();
239        }
240
241        if mask.is_writable() {
242            if self.poll_write().is_ready() {
243                ret = Ready::writable();
244            }
245        }
246
247        let mask = mask - Ready::writable();
248
249        if !mask.is_empty() {
250            if let Async::Ready(v) = self.poll_read2() {
251                ret |= v & mask;
252            }
253        }
254
255        if ret.is_empty() {
256            if mask.is_writable() {
257                self.need_write();
258            }
259
260            if mask.is_readable() {
261                self.need_read();
262            }
263
264            Async::NotReady
265        } else {
266            ret.into()
267        }
268    }
269
270    /// Indicates to this source of events that the corresponding I/O object is
271    /// no longer readable, but it needs to be.
272    ///
273    /// This function, like `poll_read`, is only safe to call from the context
274    /// of a future's task (typically in a `Future::poll` implementation). It
275    /// informs this readiness stream that the underlying object is no longer
276    /// readable, typically because a "would block" error was seen.
277    ///
278    /// *All* readiness bits associated with this stream except the writable bit
279    /// will be reset when this method is called. The current task is then
280    /// scheduled to receive a notification whenever anything changes other than
281    /// the writable bit. Note that this typically just means the readable bit
282    /// is used here, but if you're using a custom I/O object for events like
283    /// hup/error this may also be relevant.
284    ///
285    /// Note that it is also only valid to call this method if `poll_read`
286    /// previously indicated that the object is readable. That is, this function
287    /// must always be paired with calls to `poll_read` previously.
288    ///
289    /// # Panics
290    ///
291    /// This function will panic if called outside the context of a future's
292    /// task.
293    pub fn need_read(&self) {
294        self.inner.read_readiness.store(0, Relaxed);
295
296        if self.poll_read().is_ready() {
297            // Notify the current task
298            task::current().notify();
299        }
300    }
301
302    /// Indicates to this source of events that the corresponding I/O object is
303    /// no longer writable, but it needs to be.
304    ///
305    /// This function, like `poll_write`, is only safe to call from the context
306    /// of a future's task (typically in a `Future::poll` implementation). It
307    /// informs this readiness stream that the underlying object is no longer
308    /// writable, typically because a "would block" error was seen.
309    ///
310    /// The flag indicating that this stream is writable is unset and the
311    /// current task is scheduled to receive a notification when the stream is
312    /// then again writable.
313    ///
314    /// Note that it is also only valid to call this method if `poll_write`
315    /// previously indicated that the object is writable. That is, this function
316    /// must always be paired with calls to `poll_write` previously.
317    ///
318    /// # Panics
319    ///
320    /// This function will panic if called outside the context of a future's
321    /// task.
322    pub fn need_write(&self) {
323        self.inner.write_readiness.store(0, Relaxed);
324
325        if self.poll_write().is_ready() {
326            // Notify the current task
327            task::current().notify();
328        }
329    }
330
331    /// Returns a reference to the event loop handle that this readiness stream
332    /// is associated with.
333    pub fn remote(&self) -> &Remote {
334        &self.remote
335    }
336
337    /// Returns a shared reference to the underlying I/O object this readiness
338    /// stream is wrapping.
339    pub fn get_ref(&self) -> &E {
340        &self.io
341    }
342
343    /// Returns a mutable reference to the underlying I/O object this readiness
344    /// stream is wrapping.
345    pub fn get_mut(&mut self) -> &mut E {
346        &mut self.io
347    }
348}
349
350impl<E: Read> Read for PollEvented<E> {
351    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
352        if let Async::NotReady = PollEvented::poll_read(self) {
353            return Err(io::ErrorKind::WouldBlock.into())
354        }
355
356        let r = self.get_mut().read(buf);
357
358        if is_wouldblock(&r) {
359            self.need_read();
360        }
361
362        r
363    }
364}
365
366impl<E: Write> Write for PollEvented<E> {
367    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
368        if let Async::NotReady = PollEvented::poll_write(self) {
369            return Err(io::ErrorKind::WouldBlock.into())
370        }
371
372        let r = self.get_mut().write(buf);
373
374        if is_wouldblock(&r) {
375            self.need_write();
376        }
377
378        r
379    }
380
381    fn flush(&mut self) -> io::Result<()> {
382        if let Async::NotReady = PollEvented::poll_write(self) {
383            return Err(io::ErrorKind::WouldBlock.into())
384        }
385
386        let r = self.get_mut().flush();
387
388        if is_wouldblock(&r) {
389            self.need_write();
390        }
391
392        r
393    }
394}
395
396impl<E: Read> AsyncRead for PollEvented<E> {
397}
398
399impl<E: Write> AsyncWrite for PollEvented<E> {
400    fn shutdown(&mut self) -> Poll<(), io::Error> {
401        Ok(().into())
402    }
403}
404
405#[allow(deprecated)]
406impl<E: Read + Write> ::io::Io for PollEvented<E> {
407    fn poll_read(&mut self) -> Async<()> {
408        <PollEvented<E>>::poll_read(self)
409    }
410
411    fn poll_write(&mut self) -> Async<()> {
412        <PollEvented<E>>::poll_write(self)
413    }
414}
415
416impl<'a, E> Read for &'a PollEvented<E>
417    where &'a E: Read,
418{
419    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
420        if let Async::NotReady = PollEvented::poll_read(self) {
421            return Err(io::ErrorKind::WouldBlock.into())
422        }
423
424        let r = self.get_ref().read(buf);
425
426        if is_wouldblock(&r) {
427            self.need_read();
428        }
429
430        r
431    }
432}
433
434impl<'a, E> Write for &'a PollEvented<E>
435    where &'a E: Write,
436{
437    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
438        if let Async::NotReady = PollEvented::poll_write(self) {
439            return Err(io::ErrorKind::WouldBlock.into())
440        }
441
442        let r = self.get_ref().write(buf);
443
444        if is_wouldblock(&r) {
445            self.need_write();
446        }
447
448        r
449    }
450
451    fn flush(&mut self) -> io::Result<()> {
452        if let Async::NotReady = PollEvented::poll_write(self) {
453            return Err(io::ErrorKind::WouldBlock.into())
454        }
455
456        let r = self.get_ref().flush();
457
458        if is_wouldblock(&r) {
459            self.need_write();
460        }
461
462        r
463    }
464}
465
466impl<'a, E> AsyncRead for &'a PollEvented<E>
467    where &'a E: Read,
468{
469}
470
471impl<'a, E> AsyncWrite for &'a PollEvented<E>
472    where &'a E: Write,
473{
474    fn shutdown(&mut self) -> Poll<(), io::Error> {
475        Ok(().into())
476    }
477}
478
479#[allow(deprecated)]
480impl<'a, E> ::io::Io for &'a PollEvented<E>
481    where &'a E: Read + Write,
482{
483    fn poll_read(&mut self) -> Async<()> {
484        <PollEvented<E>>::poll_read(self)
485    }
486
487    fn poll_write(&mut self) -> Async<()> {
488        <PollEvented<E>>::poll_write(self)
489    }
490}
491
492fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
493    match *r {
494        Ok(_) => false,
495        Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
496    }
497}
498
499impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
500    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
501        f.debug_struct("PollEvented")
502         .field("io", &self.io)
503         .finish()
504    }
505}