tokio_reactor/registration.rs
1use {Direction, Handle, HandlePriv, Task};
2
3use futures::{task, Async, Poll};
4use mio::{self, Evented};
5
6use std::cell::UnsafeCell;
7use std::sync::atomic::AtomicUsize;
8use std::sync::atomic::Ordering::SeqCst;
9use std::{io, ptr, usize};
10
11/// Associates an I/O resource with the reactor instance that drives it.
12///
13/// A registration represents an I/O resource registered with a Reactor such
14/// that it will receive task notifications on readiness. This is the lowest
15/// level API for integrating with a reactor.
16///
17/// The association between an I/O resource is made by calling [`register`].
18/// Once the association is established, it remains established until the
19/// registration instance is dropped. Subsequent calls to [`register`] are
20/// no-ops.
21///
22/// A registration instance represents two separate readiness streams. One for
23/// the read readiness and one for write readiness. These streams are
24/// independent and can be consumed from separate tasks.
25///
26/// **Note**: while `Registration` is `Sync`, the caller must ensure that there
27/// are at most two tasks that use a registration instance concurrently. One
28/// task for [`poll_read_ready`] and one task for [`poll_write_ready`]. While
29/// violating this requirement is "safe" from a Rust memory safety point of
30/// view, it will result in unexpected behavior in the form of lost
31/// notifications and tasks hanging.
32///
33/// ## Platform-specific events
34///
35/// `Registration` also allows receiving platform-specific `mio::Ready` events.
36/// These events are included as part of the read readiness event stream. The
37/// write readiness event stream is only for `Ready::writable()` events.
38///
39/// [`register`]: #method.register
40/// [`poll_read_ready`]: #method.poll_read_ready`]
41/// [`poll_write_ready`]: #method.poll_write_ready`]
42#[derive(Debug)]
43pub struct Registration {
44 /// Stores the handle. Once set, the value is not changed.
45 ///
46 /// Setting this requires acquiring the lock from state.
47 inner: UnsafeCell<Option<Inner>>,
48
49 /// Tracks the state of the registration.
50 ///
51 /// The least significant 2 bits are used to track the lifecycle of the
52 /// registration. The rest of the `state` variable is a pointer to tasks
53 /// that must be notified once the lock is released.
54 state: AtomicUsize,
55}
56
57#[derive(Debug)]
58struct Inner {
59 handle: HandlePriv,
60 token: usize,
61}
62
63#[derive(PartialEq)]
64enum Notify {
65 Yes,
66 No,
67}
68
69/// Tasks waiting on readiness notifications.
70#[derive(Debug)]
71struct Node {
72 direction: Direction,
73 task: Task,
74 next: *mut Node,
75}
76
77/// Initial state. The handle is not set and the registration is idle.
78const INIT: usize = 0;
79
80/// A thread locked the state and will associate a handle.
81const LOCKED: usize = 1;
82
83/// A handle has been associated with the registration.
84const READY: usize = 2;
85
86/// Masks the lifecycle state
87const LIFECYCLE_MASK: usize = 0b11;
88
89/// A fake token used to identify error situations
90const ERROR: usize = usize::MAX;
91
92// ===== impl Registration =====
93
94impl Registration {
95 /// Create a new `Registration`.
96 ///
97 /// This registration is not associated with a Reactor instance. Call
98 /// `register` to establish the association.
99 pub fn new() -> Registration {
100 Registration {
101 inner: UnsafeCell::new(None),
102 state: AtomicUsize::new(INIT),
103 }
104 }
105
106 /// Register the I/O resource with the default reactor.
107 ///
108 /// This function is safe to call concurrently and repeatedly. However, only
109 /// the first call will establish the registration. Subsequent calls will be
110 /// no-ops.
111 ///
112 /// # Return
113 ///
114 /// If the registration happened successfully, `Ok(true)` is returned.
115 ///
116 /// If an I/O resource has previously been successfully registered,
117 /// `Ok(false)` is returned.
118 ///
119 /// If an error is encountered during registration, `Err` is returned.
120 pub fn register<T>(&self, io: &T) -> io::Result<bool>
121 where
122 T: Evented,
123 {
124 self.register2(io, || HandlePriv::try_current())
125 }
126
127 /// Deregister the I/O resource from the reactor it is associated with.
128 ///
129 /// This function must be called before the I/O resource associated with the
130 /// registration is dropped.
131 ///
132 /// Note that deregistering does not guarantee that the I/O resource can be
133 /// registered with a different reactor. Some I/O resource types can only be
134 /// associated with a single reactor instance for their lifetime.
135 ///
136 /// # Return
137 ///
138 /// If the deregistration was successful, `Ok` is returned. Any calls to
139 /// `Reactor::turn` that happen after a successful call to `deregister` will
140 /// no longer result in notifications getting sent for this registration.
141 ///
142 /// `Err` is returned if an error is encountered.
143 pub fn deregister<T>(&mut self, io: &T) -> io::Result<()>
144 where
145 T: Evented,
146 {
147 // The state does not need to be checked and coordination is not
148 // necessary as this function takes `&mut self`. This guarantees a
149 // single thread is accessing the instance.
150 if let Some(inner) = unsafe { (*self.inner.get()).as_ref() } {
151 inner.deregister(io)?;
152 }
153
154 Ok(())
155 }
156
157 /// Register the I/O resource with the specified reactor.
158 ///
159 /// This function is safe to call concurrently and repeatedly. However, only
160 /// the first call will establish the registration. Subsequent calls will be
161 /// no-ops.
162 ///
163 /// If the registration happened successfully, `Ok(true)` is returned.
164 ///
165 /// If an I/O resource has previously been successfully registered,
166 /// `Ok(false)` is returned.
167 ///
168 /// If an error is encountered during registration, `Err` is returned.
169 pub fn register_with<T>(&self, io: &T, handle: &Handle) -> io::Result<bool>
170 where
171 T: Evented,
172 {
173 self.register2(io, || match handle.as_priv() {
174 Some(handle) => Ok(handle.clone()),
175 None => HandlePriv::try_current(),
176 })
177 }
178
179 pub(crate) fn register_with_priv<T>(&self, io: &T, handle: &HandlePriv) -> io::Result<bool>
180 where
181 T: Evented,
182 {
183 self.register2(io, || Ok(handle.clone()))
184 }
185
186 fn register2<T, F>(&self, io: &T, f: F) -> io::Result<bool>
187 where
188 T: Evented,
189 F: Fn() -> io::Result<HandlePriv>,
190 {
191 let mut state = self.state.load(SeqCst);
192
193 loop {
194 match state {
195 INIT => {
196 // Registration is currently not associated with a handle.
197 // Get a handle then attempt to lock the state.
198 let handle = f()?;
199
200 let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst);
201
202 if actual != state {
203 state = actual;
204 continue;
205 }
206
207 // Create the actual registration
208 let (inner, res) = Inner::new(io, handle);
209
210 unsafe {
211 *self.inner.get() = Some(inner);
212 }
213
214 // Transition out of the locked state. This acquires the
215 // current value, potentially having a list of tasks that
216 // are pending readiness notifications.
217 let actual = self.state.swap(READY, SeqCst);
218
219 // Consume the stack of nodes
220
221 let mut read = false;
222 let mut write = false;
223 let mut ptr = (actual & !LIFECYCLE_MASK) as *mut Node;
224
225 let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
226
227 while !ptr.is_null() {
228 let node = unsafe { Box::from_raw(ptr) };
229 let node = *node;
230 let Node {
231 direction,
232 task,
233 next,
234 } = node;
235
236 let flag = match direction {
237 Direction::Read => &mut read,
238 Direction::Write => &mut write,
239 };
240
241 if !*flag {
242 *flag = true;
243
244 inner.register(direction, task);
245 }
246
247 ptr = next;
248 }
249
250 return res.map(|_| true);
251 }
252 _ => return Ok(false),
253 }
254 }
255 }
256
257 /// Poll for events on the I/O resource's read readiness stream.
258 ///
259 /// If the I/O resource receives a new read readiness event since the last
260 /// call to `poll_read_ready`, it is returned. If it has not, the current
261 /// task is notified once a new event is received.
262 ///
263 /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
264 /// the function will always return `Ready(HUP)`. This should be treated as
265 /// the end of the readiness stream.
266 ///
267 /// Ensure that [`register`] has been called first.
268 ///
269 /// # Return value
270 ///
271 /// There are several possible return values:
272 ///
273 /// * `Ok(Async::Ready(readiness))` means that the I/O resource has received
274 /// a new readiness event. The readiness value is included.
275 ///
276 /// * `Ok(NotReady)` means that no new readiness events have been received
277 /// since the last call to `poll_read_ready`.
278 ///
279 /// * `Err(err)` means that the registration has encountered an error. This
280 /// error either represents a permanent internal error **or** the fact
281 /// that [`register`] was not called first.
282 ///
283 /// [`register`]: #method.register
284 /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
285 ///
286 /// # Panics
287 ///
288 /// This function will panic if called from outside of a task context.
289 pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> {
290 self.poll_ready(Direction::Read, Notify::Yes)
291 .map(|v| match v {
292 Some(v) => Async::Ready(v),
293 _ => Async::NotReady,
294 })
295 }
296
297 /// Consume any pending read readiness event.
298 ///
299 /// This function is identical to [`poll_read_ready`] **except** that it
300 /// will not notify the current task when a new event is received. As such,
301 /// it is safe to call this function from outside of a task context.
302 ///
303 /// [`poll_read_ready`]: #method.poll_read_ready
304 pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
305 self.poll_ready(Direction::Read, Notify::No)
306 }
307
308 /// Poll for events on the I/O resource's write readiness stream.
309 ///
310 /// If the I/O resource receives a new write readiness event since the last
311 /// call to `poll_write_ready`, it is returned. If it has not, the current
312 /// task is notified once a new event is received.
313 ///
314 /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
315 /// the function will always return `Ready(HUP)`. This should be treated as
316 /// the end of the readiness stream.
317 ///
318 /// Ensure that [`register`] has been called first.
319 ///
320 /// # Return value
321 ///
322 /// There are several possible return values:
323 ///
324 /// * `Ok(Async::Ready(readiness))` means that the I/O resource has received
325 /// a new readiness event. The readiness value is included.
326 ///
327 /// * `Ok(NotReady)` means that no new readiness events have been received
328 /// since the last call to `poll_write_ready`.
329 ///
330 /// * `Err(err)` means that the registration has encountered an error. This
331 /// error either represents a permanent internal error **or** the fact
332 /// that [`register`] was not called first.
333 ///
334 /// [`register`]: #method.register
335 /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
336 ///
337 /// # Panics
338 ///
339 /// This function will panic if called from outside of a task context.
340 pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
341 self.poll_ready(Direction::Write, Notify::Yes)
342 .map(|v| match v {
343 Some(v) => Async::Ready(v),
344 _ => Async::NotReady,
345 })
346 }
347
348 /// Consume any pending write readiness event.
349 ///
350 /// This function is identical to [`poll_write_ready`] **except** that it
351 /// will not notify the current task when a new event is received. As such,
352 /// it is safe to call this function from outside of a task context.
353 ///
354 /// [`poll_write_ready`]: #method.poll_write_ready
355 pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
356 self.poll_ready(Direction::Write, Notify::No)
357 }
358
359 fn poll_ready(&self, direction: Direction, notify: Notify) -> io::Result<Option<mio::Ready>> {
360 let mut state = self.state.load(SeqCst);
361
362 // Cache the node pointer
363 let mut node = None;
364
365 loop {
366 match state {
367 INIT => {
368 return Err(io::Error::new(
369 io::ErrorKind::Other,
370 "must call `register`
371 before poll_read_ready",
372 ));
373 }
374 READY => {
375 let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
376 return inner.poll_ready(direction, notify);
377 }
378 LOCKED => {
379 if let Notify::No = notify {
380 // Skip the notification tracking junk.
381 return Ok(None);
382 }
383
384 let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node;
385
386 let task = task::current();
387
388 // Get the node
389 let mut n = node.take().unwrap_or_else(|| {
390 Box::new(Node {
391 direction,
392 task: task,
393 next: ptr::null_mut(),
394 })
395 });
396
397 n.next = next_ptr;
398
399 let node_ptr = Box::into_raw(n);
400 let next = node_ptr as usize | (state & LIFECYCLE_MASK);
401
402 let actual = self.state.compare_and_swap(state, next, SeqCst);
403
404 if actual != state {
405 // Back out of the node boxing
406 let n = unsafe { Box::from_raw(node_ptr) };
407
408 // Save this for next loop
409 node = Some(n);
410
411 state = actual;
412 continue;
413 }
414
415 return Ok(None);
416 }
417 _ => unreachable!(),
418 }
419 }
420 }
421}
422
423unsafe impl Send for Registration {}
424unsafe impl Sync for Registration {}
425
426// ===== impl Inner =====
427
428impl Inner {
429 fn new<T>(io: &T, handle: HandlePriv) -> (Self, io::Result<()>)
430 where
431 T: Evented,
432 {
433 let mut res = Ok(());
434
435 let token = match handle.inner() {
436 Some(inner) => match inner.add_source(io) {
437 Ok(token) => token,
438 Err(e) => {
439 res = Err(e);
440 ERROR
441 }
442 },
443 None => {
444 res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone"));
445 ERROR
446 }
447 };
448
449 let inner = Inner { handle, token };
450
451 (inner, res)
452 }
453
454 fn register(&self, direction: Direction, task: Task) {
455 if self.token == ERROR {
456 task.notify();
457 return;
458 }
459
460 let inner = match self.handle.inner() {
461 Some(inner) => inner,
462 None => {
463 task.notify();
464 return;
465 }
466 };
467
468 inner.register(self.token, direction, task);
469 }
470
471 fn deregister<E: Evented>(&self, io: &E) -> io::Result<()> {
472 if self.token == ERROR {
473 return Err(io::Error::new(
474 io::ErrorKind::Other,
475 "failed to associate with reactor",
476 ));
477 }
478
479 let inner = match self.handle.inner() {
480 Some(inner) => inner,
481 None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
482 };
483
484 inner.deregister_source(io)
485 }
486
487 fn poll_ready(&self, direction: Direction, notify: Notify) -> io::Result<Option<mio::Ready>> {
488 if self.token == ERROR {
489 return Err(io::Error::new(
490 io::ErrorKind::Other,
491 "failed to associate with reactor",
492 ));
493 }
494
495 let inner = match self.handle.inner() {
496 Some(inner) => inner,
497 None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
498 };
499
500 let mask = direction.mask();
501 let mask_no_hup = (mask - ::platform::hup()).as_usize();
502
503 let io_dispatch = inner.io_dispatch.read();
504 let sched = &io_dispatch[self.token];
505
506 // This consumes the current readiness state **except** for HUP. HUP is
507 // excluded because a) it is a final state and never transitions out of
508 // HUP and b) both the read AND the write directions need to be able to
509 // observe this state.
510 //
511 // If HUP were to be cleared when `direction` is `Read`, then when
512 // `poll_ready` is called again with a _`direction` of `Write`, the HUP
513 // state would not be visible.
514 let mut ready =
515 mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst));
516
517 if ready.is_empty() && notify == Notify::Yes {
518 debug!("scheduling {:?} for: {}", direction, self.token);
519 // Update the task info
520 match direction {
521 Direction::Read => sched.reader.register(),
522 Direction::Write => sched.writer.register(),
523 }
524
525 // Try again
526 ready = mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst));
527 }
528
529 if ready.is_empty() {
530 Ok(None)
531 } else {
532 Ok(Some(ready))
533 }
534 }
535}
536
537impl Drop for Inner {
538 fn drop(&mut self) {
539 if self.token == ERROR {
540 return;
541 }
542
543 let inner = match self.handle.inner() {
544 Some(inner) => inner,
545 None => return,
546 };
547
548 inner.drop_source(self.token);
549 }
550}