tokio_core/reactor/poll_evented.rs
1//! Readiness tracking streams, backing I/O objects.
2//!
3//! This module contains the core type which is used to back all I/O on object
4//! in `tokio-core`. The `PollEvented` type is the implementation detail of
5//! all I/O. Each `PollEvented` manages registration with a reactor,
6//! acquisition of a token, and tracking of the readiness state on the
7//! underlying I/O primitive.
8
9use std::fmt;
10use std::io::{self, Read, Write};
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering::Relaxed;
13
14use futures::{task, Async, Poll};
15use mio::event::Evented;
16use mio::Ready;
17use tokio_io::{AsyncRead, AsyncWrite};
18use tokio::reactor::{Registration};
19
20use reactor::{Handle, Remote};
21
22/// A concrete implementation of a stream of readiness notifications for I/O
23/// objects that originates from an event loop.
24///
25/// Created by the `PollEvented::new` method, each `PollEvented` is
26/// associated with a specific event loop and source of events that will be
27/// registered with an event loop.
28///
29/// An instance of `PollEvented` is essentially the bridge between the `mio`
30/// world and the `tokio-core` world, providing abstractions to receive
31/// notifications about changes to an object's `mio::Ready` state.
32///
33/// Each readiness stream has a number of methods to test whether the underlying
34/// object is readable or writable. Once the methods return that an object is
35/// readable/writable, then it will continue to do so until the `need_read` or
36/// `need_write` methods are called.
37///
38/// That is, this object is typically wrapped in another form of I/O object.
39/// It's the responsibility of the wrapper to inform the readiness stream when a
40/// "would block" I/O event is seen. The readiness stream will then take care of
41/// any scheduling necessary to get notified when the event is ready again.
42///
43/// You can find more information about creating a custom I/O object [online].
44///
45/// [online]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/#custom-io
46///
47/// ## Readiness to read/write
48///
49/// A `PollEvented` allows listening and waiting for an arbitrary `mio::Ready`
50/// instance, including the platform-specific contents of `mio::Ready`. At most
51/// two future tasks, however, can be waiting on a `PollEvented`. The
52/// `need_read` and `need_write` methods can block two separate tasks, one on
53/// reading and one on writing. Not all I/O events correspond to read/write,
54/// however!
55///
56/// To account for this a `PollEvented` gets a little interesting when working
57/// with an arbitrary instance of `mio::Ready` that may not map precisely to
58/// "write" and "read" tasks. Currently it is defined that instances of
59/// `mio::Ready` that do *not* return true from `is_writable` are all notified
60/// through `need_read`, or the read task.
61///
62/// In other words, `poll_ready` with the `mio::UnixReady::hup` event will block
63/// the read task of this `PollEvented` if the `hup` event isn't available.
64/// Essentially a good rule of thumb is that if you're using the `poll_ready`
65/// method you want to also use `need_read` to signal blocking and you should
66/// otherwise probably avoid using two tasks on the same `PollEvented`.
67pub struct PollEvented<E> {
68 io: E,
69 inner: Inner,
70 remote: Remote,
71}
72
73struct Inner {
74 registration: Registration,
75
76 /// Currently visible read readiness
77 read_readiness: AtomicUsize,
78
79 /// Currently visible write readiness
80 write_readiness: AtomicUsize,
81}
82
83impl<E: Evented> PollEvented<E> {
84 /// Creates a new readiness stream associated with the provided
85 /// `loop_handle` and for the given `source`.
86 ///
87 /// This method returns a future which will resolve to the readiness stream
88 /// when it's ready.
89 pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>> {
90 let registration = Registration::new();
91 registration.register_with(&io, handle.new_tokio_handle())?;
92
93 Ok(PollEvented {
94 io: io,
95 inner: Inner {
96 registration,
97 read_readiness: AtomicUsize::new(0),
98 write_readiness: AtomicUsize::new(0),
99 },
100 remote: handle.remote().clone(),
101 })
102 }
103
104 /// Deregisters this source of events from the reactor core specified.
105 ///
106 /// This method can optionally be called to unregister the underlying I/O
107 /// object with the event loop that the `handle` provided points to.
108 /// Typically this method is not required as this automatically happens when
109 /// `E` is dropped, but for some use cases the `E` object doesn't represent
110 /// an owned reference, so dropping it won't automatically unregister with
111 /// the event loop.
112 ///
113 /// This consumes `self` as it will no longer provide events after the
114 /// method is called, and will likely return an error if this `PollEvented`
115 /// was created on a separate event loop from the `handle` specified.
116 pub fn deregister(self, _: &Handle) -> io::Result<()> {
117 // Nothing has to happen here anymore as I/O objects are explicitly
118 // deregistered before dropped.
119 Ok(())
120 }
121}
122
123impl<E> PollEvented<E> {
124 /// Tests to see if this source is ready to be read from or not.
125 ///
126 /// If this stream is not ready for a read then `NotReady` will be returned
127 /// and the current task will be scheduled to receive a notification when
128 /// the stream is readable again. In other words, this method is only safe
129 /// to call from within the context of a future's task, typically done in a
130 /// `Future::poll` method.
131 ///
132 /// This is mostly equivalent to `self.poll_ready(Ready::readable())`.
133 ///
134 /// # Panics
135 ///
136 /// This function will panic if called outside the context of a future's
137 /// task.
138 pub fn poll_read(&self) -> Async<()> {
139 if self.poll_read2().is_ready() {
140 return ().into();
141 }
142
143 Async::NotReady
144 }
145
146 fn poll_read2(&self) -> Async<Ready> {
147 // Load the cached readiness
148 match self.inner.read_readiness.load(Relaxed) {
149 0 => {}
150 mut n => {
151 // Check what's new with the reactor.
152 if let Some(ready) = self.inner.registration.take_read_ready().unwrap() {
153 n |= super::ready2usize(ready);
154 self.inner.read_readiness.store(n, Relaxed);
155 }
156
157 return super::usize2ready(n).into();
158 }
159 }
160
161 let ready = match self.inner.registration.poll_read_ready().unwrap() {
162 Async::Ready(r) => r,
163 _ => return Async::NotReady,
164 };
165
166 // Cache the value
167 self.inner.read_readiness.store(super::ready2usize(ready), Relaxed);
168
169 ready.into()
170 }
171
172 /// Tests to see if this source is ready to be written to or not.
173 ///
174 /// If this stream is not ready for a write then `NotReady` will be returned
175 /// and the current task will be scheduled to receive a notification when
176 /// the stream is writable again. In other words, this method is only safe
177 /// to call from within the context of a future's task, typically done in a
178 /// `Future::poll` method.
179 ///
180 /// This is mostly equivalent to `self.poll_ready(Ready::writable())`.
181 ///
182 /// # Panics
183 ///
184 /// This function will panic if called outside the context of a future's
185 /// task.
186 pub fn poll_write(&self) -> Async<()> {
187 match self.inner.write_readiness.load(Relaxed) {
188 0 => {}
189 mut n => {
190 // Check what's new with the reactor.
191 if let Some(ready) = self.inner.registration.take_write_ready().unwrap() {
192 n |= super::ready2usize(ready);
193 self.inner.write_readiness.store(n, Relaxed);
194 }
195
196 return ().into();
197 }
198 }
199
200 let ready = match self.inner.registration.poll_write_ready().unwrap() {
201 Async::Ready(r) => r,
202 _ => return Async::NotReady,
203 };
204
205 // Cache the value
206 self.inner.write_readiness.store(super::ready2usize(ready), Relaxed);
207
208 ().into()
209 }
210
211 /// Test to see whether this source fulfills any condition listed in `mask`
212 /// provided.
213 ///
214 /// The `mask` given here is a mio `Ready` set of possible events. This can
215 /// contain any events like read/write but also platform-specific events
216 /// such as hup and error. The `mask` indicates events that are interested
217 /// in being ready.
218 ///
219 /// If any event in `mask` is ready then it is returned through
220 /// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty
221 /// and contains all events that are currently ready in the `mask` provided.
222 ///
223 /// If no events are ready in the `mask` provided then the current task is
224 /// scheduled to receive a notification when any of them become ready. If
225 /// the `writable` event is contained within `mask` then this
226 /// `PollEvented`'s `write` task will be blocked and otherwise the `read`
227 /// task will be blocked. This is generally only relevant if you're working
228 /// with this `PollEvented` object on multiple tasks.
229 ///
230 /// # Panics
231 ///
232 /// This function will panic if called outside the context of a future's
233 /// task.
234 pub fn poll_ready(&self, mask: Ready) -> Async<Ready> {
235 let mut ret = Ready::empty();
236
237 if mask.is_empty() {
238 return ret.into();
239 }
240
241 if mask.is_writable() {
242 if self.poll_write().is_ready() {
243 ret = Ready::writable();
244 }
245 }
246
247 let mask = mask - Ready::writable();
248
249 if !mask.is_empty() {
250 if let Async::Ready(v) = self.poll_read2() {
251 ret |= v & mask;
252 }
253 }
254
255 if ret.is_empty() {
256 if mask.is_writable() {
257 self.need_write();
258 }
259
260 if mask.is_readable() {
261 self.need_read();
262 }
263
264 Async::NotReady
265 } else {
266 ret.into()
267 }
268 }
269
270 /// Indicates to this source of events that the corresponding I/O object is
271 /// no longer readable, but it needs to be.
272 ///
273 /// This function, like `poll_read`, is only safe to call from the context
274 /// of a future's task (typically in a `Future::poll` implementation). It
275 /// informs this readiness stream that the underlying object is no longer
276 /// readable, typically because a "would block" error was seen.
277 ///
278 /// *All* readiness bits associated with this stream except the writable bit
279 /// will be reset when this method is called. The current task is then
280 /// scheduled to receive a notification whenever anything changes other than
281 /// the writable bit. Note that this typically just means the readable bit
282 /// is used here, but if you're using a custom I/O object for events like
283 /// hup/error this may also be relevant.
284 ///
285 /// Note that it is also only valid to call this method if `poll_read`
286 /// previously indicated that the object is readable. That is, this function
287 /// must always be paired with calls to `poll_read` previously.
288 ///
289 /// # Panics
290 ///
291 /// This function will panic if called outside the context of a future's
292 /// task.
293 pub fn need_read(&self) {
294 self.inner.read_readiness.store(0, Relaxed);
295
296 if self.poll_read().is_ready() {
297 // Notify the current task
298 task::current().notify();
299 }
300 }
301
302 /// Indicates to this source of events that the corresponding I/O object is
303 /// no longer writable, but it needs to be.
304 ///
305 /// This function, like `poll_write`, is only safe to call from the context
306 /// of a future's task (typically in a `Future::poll` implementation). It
307 /// informs this readiness stream that the underlying object is no longer
308 /// writable, typically because a "would block" error was seen.
309 ///
310 /// The flag indicating that this stream is writable is unset and the
311 /// current task is scheduled to receive a notification when the stream is
312 /// then again writable.
313 ///
314 /// Note that it is also only valid to call this method if `poll_write`
315 /// previously indicated that the object is writable. That is, this function
316 /// must always be paired with calls to `poll_write` previously.
317 ///
318 /// # Panics
319 ///
320 /// This function will panic if called outside the context of a future's
321 /// task.
322 pub fn need_write(&self) {
323 self.inner.write_readiness.store(0, Relaxed);
324
325 if self.poll_write().is_ready() {
326 // Notify the current task
327 task::current().notify();
328 }
329 }
330
331 /// Returns a reference to the event loop handle that this readiness stream
332 /// is associated with.
333 pub fn remote(&self) -> &Remote {
334 &self.remote
335 }
336
337 /// Returns a shared reference to the underlying I/O object this readiness
338 /// stream is wrapping.
339 pub fn get_ref(&self) -> &E {
340 &self.io
341 }
342
343 /// Returns a mutable reference to the underlying I/O object this readiness
344 /// stream is wrapping.
345 pub fn get_mut(&mut self) -> &mut E {
346 &mut self.io
347 }
348}
349
350impl<E: Read> Read for PollEvented<E> {
351 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
352 if let Async::NotReady = PollEvented::poll_read(self) {
353 return Err(io::ErrorKind::WouldBlock.into())
354 }
355
356 let r = self.get_mut().read(buf);
357
358 if is_wouldblock(&r) {
359 self.need_read();
360 }
361
362 r
363 }
364}
365
366impl<E: Write> Write for PollEvented<E> {
367 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
368 if let Async::NotReady = PollEvented::poll_write(self) {
369 return Err(io::ErrorKind::WouldBlock.into())
370 }
371
372 let r = self.get_mut().write(buf);
373
374 if is_wouldblock(&r) {
375 self.need_write();
376 }
377
378 r
379 }
380
381 fn flush(&mut self) -> io::Result<()> {
382 if let Async::NotReady = PollEvented::poll_write(self) {
383 return Err(io::ErrorKind::WouldBlock.into())
384 }
385
386 let r = self.get_mut().flush();
387
388 if is_wouldblock(&r) {
389 self.need_write();
390 }
391
392 r
393 }
394}
395
396impl<E: Read> AsyncRead for PollEvented<E> {
397}
398
399impl<E: Write> AsyncWrite for PollEvented<E> {
400 fn shutdown(&mut self) -> Poll<(), io::Error> {
401 Ok(().into())
402 }
403}
404
405#[allow(deprecated)]
406impl<E: Read + Write> ::io::Io for PollEvented<E> {
407 fn poll_read(&mut self) -> Async<()> {
408 <PollEvented<E>>::poll_read(self)
409 }
410
411 fn poll_write(&mut self) -> Async<()> {
412 <PollEvented<E>>::poll_write(self)
413 }
414}
415
416impl<'a, E> Read for &'a PollEvented<E>
417 where &'a E: Read,
418{
419 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
420 if let Async::NotReady = PollEvented::poll_read(self) {
421 return Err(io::ErrorKind::WouldBlock.into())
422 }
423
424 let r = self.get_ref().read(buf);
425
426 if is_wouldblock(&r) {
427 self.need_read();
428 }
429
430 r
431 }
432}
433
434impl<'a, E> Write for &'a PollEvented<E>
435 where &'a E: Write,
436{
437 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
438 if let Async::NotReady = PollEvented::poll_write(self) {
439 return Err(io::ErrorKind::WouldBlock.into())
440 }
441
442 let r = self.get_ref().write(buf);
443
444 if is_wouldblock(&r) {
445 self.need_write();
446 }
447
448 r
449 }
450
451 fn flush(&mut self) -> io::Result<()> {
452 if let Async::NotReady = PollEvented::poll_write(self) {
453 return Err(io::ErrorKind::WouldBlock.into())
454 }
455
456 let r = self.get_ref().flush();
457
458 if is_wouldblock(&r) {
459 self.need_write();
460 }
461
462 r
463 }
464}
465
466impl<'a, E> AsyncRead for &'a PollEvented<E>
467 where &'a E: Read,
468{
469}
470
471impl<'a, E> AsyncWrite for &'a PollEvented<E>
472 where &'a E: Write,
473{
474 fn shutdown(&mut self) -> Poll<(), io::Error> {
475 Ok(().into())
476 }
477}
478
479#[allow(deprecated)]
480impl<'a, E> ::io::Io for &'a PollEvented<E>
481 where &'a E: Read + Write,
482{
483 fn poll_read(&mut self) -> Async<()> {
484 <PollEvented<E>>::poll_read(self)
485 }
486
487 fn poll_write(&mut self) -> Async<()> {
488 <PollEvented<E>>::poll_write(self)
489 }
490}
491
492fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
493 match *r {
494 Ok(_) => false,
495 Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
496 }
497}
498
499impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
500 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
501 f.debug_struct("PollEvented")
502 .field("io", &self.io)
503 .finish()
504 }
505}