tokio_reactor/
registration.rs

1use {Direction, Handle, HandlePriv, Task};
2
3use futures::{task, Async, Poll};
4use mio::{self, Evented};
5
6use std::cell::UnsafeCell;
7use std::sync::atomic::AtomicUsize;
8use std::sync::atomic::Ordering::SeqCst;
9use std::{io, ptr, usize};
10
11/// Associates an I/O resource with the reactor instance that drives it.
12///
13/// A registration represents an I/O resource registered with a Reactor such
14/// that it will receive task notifications on readiness. This is the lowest
15/// level API for integrating with a reactor.
16///
17/// The association between an I/O resource is made by calling [`register`].
18/// Once the association is established, it remains established until the
19/// registration instance is dropped. Subsequent calls to [`register`] are
20/// no-ops.
21///
22/// A registration instance represents two separate readiness streams. One for
23/// the read readiness and one for write readiness. These streams are
24/// independent and can be consumed from separate tasks.
25///
26/// **Note**: while `Registration` is `Sync`, the caller must ensure that there
27/// are at most two tasks that use a registration instance concurrently. One
28/// task for [`poll_read_ready`] and one task for [`poll_write_ready`]. While
29/// violating this requirement is "safe" from a Rust memory safety point of
30/// view, it will result in unexpected behavior in the form of lost
31/// notifications and tasks hanging.
32///
33/// ## Platform-specific events
34///
35/// `Registration` also allows receiving platform-specific `mio::Ready` events.
36/// These events are included as part of the read readiness event stream. The
37/// write readiness event stream is only for `Ready::writable()` events.
38///
39/// [`register`]: #method.register
40/// [`poll_read_ready`]: #method.poll_read_ready`]
41/// [`poll_write_ready`]: #method.poll_write_ready`]
42#[derive(Debug)]
43pub struct Registration {
44    /// Stores the handle. Once set, the value is not changed.
45    ///
46    /// Setting this requires acquiring the lock from state.
47    inner: UnsafeCell<Option<Inner>>,
48
49    /// Tracks the state of the registration.
50    ///
51    /// The least significant 2 bits are used to track the lifecycle of the
52    /// registration. The rest of the `state` variable is a pointer to tasks
53    /// that must be notified once the lock is released.
54    state: AtomicUsize,
55}
56
57#[derive(Debug)]
58struct Inner {
59    handle: HandlePriv,
60    token: usize,
61}
62
63#[derive(PartialEq)]
64enum Notify {
65    Yes,
66    No,
67}
68
69/// Tasks waiting on readiness notifications.
70#[derive(Debug)]
71struct Node {
72    direction: Direction,
73    task: Task,
74    next: *mut Node,
75}
76
77/// Initial state. The handle is not set and the registration is idle.
78const INIT: usize = 0;
79
80/// A thread locked the state and will associate a handle.
81const LOCKED: usize = 1;
82
83/// A handle has been associated with the registration.
84const READY: usize = 2;
85
86/// Masks the lifecycle state
87const LIFECYCLE_MASK: usize = 0b11;
88
89/// A fake token used to identify error situations
90const ERROR: usize = usize::MAX;
91
92// ===== impl Registration =====
93
94impl Registration {
95    /// Create a new `Registration`.
96    ///
97    /// This registration is not associated with a Reactor instance. Call
98    /// `register` to establish the association.
99    pub fn new() -> Registration {
100        Registration {
101            inner: UnsafeCell::new(None),
102            state: AtomicUsize::new(INIT),
103        }
104    }
105
106    /// Register the I/O resource with the default reactor.
107    ///
108    /// This function is safe to call concurrently and repeatedly. However, only
109    /// the first call will establish the registration. Subsequent calls will be
110    /// no-ops.
111    ///
112    /// # Return
113    ///
114    /// If the registration happened successfully, `Ok(true)` is returned.
115    ///
116    /// If an I/O resource has previously been successfully registered,
117    /// `Ok(false)` is returned.
118    ///
119    /// If an error is encountered during registration, `Err` is returned.
120    pub fn register<T>(&self, io: &T) -> io::Result<bool>
121    where
122        T: Evented,
123    {
124        self.register2(io, || HandlePriv::try_current())
125    }
126
127    /// Deregister the I/O resource from the reactor it is associated with.
128    ///
129    /// This function must be called before the I/O resource associated with the
130    /// registration is dropped.
131    ///
132    /// Note that deregistering does not guarantee that the I/O resource can be
133    /// registered with a different reactor. Some I/O resource types can only be
134    /// associated with a single reactor instance for their lifetime.
135    ///
136    /// # Return
137    ///
138    /// If the deregistration was successful, `Ok` is returned. Any calls to
139    /// `Reactor::turn` that happen after a successful call to `deregister` will
140    /// no longer result in notifications getting sent for this registration.
141    ///
142    /// `Err` is returned if an error is encountered.
143    pub fn deregister<T>(&mut self, io: &T) -> io::Result<()>
144    where
145        T: Evented,
146    {
147        // The state does not need to be checked and coordination is not
148        // necessary as this function takes `&mut self`. This guarantees a
149        // single thread is accessing the instance.
150        if let Some(inner) = unsafe { (*self.inner.get()).as_ref() } {
151            inner.deregister(io)?;
152        }
153
154        Ok(())
155    }
156
157    /// Register the I/O resource with the specified reactor.
158    ///
159    /// This function is safe to call concurrently and repeatedly. However, only
160    /// the first call will establish the registration. Subsequent calls will be
161    /// no-ops.
162    ///
163    /// If the registration happened successfully, `Ok(true)` is returned.
164    ///
165    /// If an I/O resource has previously been successfully registered,
166    /// `Ok(false)` is returned.
167    ///
168    /// If an error is encountered during registration, `Err` is returned.
169    pub fn register_with<T>(&self, io: &T, handle: &Handle) -> io::Result<bool>
170    where
171        T: Evented,
172    {
173        self.register2(io, || match handle.as_priv() {
174            Some(handle) => Ok(handle.clone()),
175            None => HandlePriv::try_current(),
176        })
177    }
178
179    pub(crate) fn register_with_priv<T>(&self, io: &T, handle: &HandlePriv) -> io::Result<bool>
180    where
181        T: Evented,
182    {
183        self.register2(io, || Ok(handle.clone()))
184    }
185
186    fn register2<T, F>(&self, io: &T, f: F) -> io::Result<bool>
187    where
188        T: Evented,
189        F: Fn() -> io::Result<HandlePriv>,
190    {
191        let mut state = self.state.load(SeqCst);
192
193        loop {
194            match state {
195                INIT => {
196                    // Registration is currently not associated with a handle.
197                    // Get a handle then attempt to lock the state.
198                    let handle = f()?;
199
200                    let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst);
201
202                    if actual != state {
203                        state = actual;
204                        continue;
205                    }
206
207                    // Create the actual registration
208                    let (inner, res) = Inner::new(io, handle);
209
210                    unsafe {
211                        *self.inner.get() = Some(inner);
212                    }
213
214                    // Transition out of the locked state. This acquires the
215                    // current value, potentially having a list of tasks that
216                    // are pending readiness notifications.
217                    let actual = self.state.swap(READY, SeqCst);
218
219                    // Consume the stack of nodes
220
221                    let mut read = false;
222                    let mut write = false;
223                    let mut ptr = (actual & !LIFECYCLE_MASK) as *mut Node;
224
225                    let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
226
227                    while !ptr.is_null() {
228                        let node = unsafe { Box::from_raw(ptr) };
229                        let node = *node;
230                        let Node {
231                            direction,
232                            task,
233                            next,
234                        } = node;
235
236                        let flag = match direction {
237                            Direction::Read => &mut read,
238                            Direction::Write => &mut write,
239                        };
240
241                        if !*flag {
242                            *flag = true;
243
244                            inner.register(direction, task);
245                        }
246
247                        ptr = next;
248                    }
249
250                    return res.map(|_| true);
251                }
252                _ => return Ok(false),
253            }
254        }
255    }
256
257    /// Poll for events on the I/O resource's read readiness stream.
258    ///
259    /// If the I/O resource receives a new read readiness event since the last
260    /// call to `poll_read_ready`, it is returned. If it has not, the current
261    /// task is notified once a new event is received.
262    ///
263    /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
264    /// the function will always return `Ready(HUP)`. This should be treated as
265    /// the end of the readiness stream.
266    ///
267    /// Ensure that [`register`] has been called first.
268    ///
269    /// # Return value
270    ///
271    /// There are several possible return values:
272    ///
273    /// * `Ok(Async::Ready(readiness))` means that the I/O resource has received
274    ///   a new readiness event. The readiness value is included.
275    ///
276    /// * `Ok(NotReady)` means that no new readiness events have been received
277    ///   since the last call to `poll_read_ready`.
278    ///
279    /// * `Err(err)` means that the registration has encountered an error. This
280    ///   error either represents a permanent internal error **or** the fact
281    ///   that [`register`] was not called first.
282    ///
283    /// [`register`]: #method.register
284    /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
285    ///
286    /// # Panics
287    ///
288    /// This function will panic if called from outside of a task context.
289    pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> {
290        self.poll_ready(Direction::Read, Notify::Yes)
291            .map(|v| match v {
292                Some(v) => Async::Ready(v),
293                _ => Async::NotReady,
294            })
295    }
296
297    /// Consume any pending read readiness event.
298    ///
299    /// This function is identical to [`poll_read_ready`] **except** that it
300    /// will not notify the current task when a new event is received. As such,
301    /// it is safe to call this function from outside of a task context.
302    ///
303    /// [`poll_read_ready`]: #method.poll_read_ready
304    pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
305        self.poll_ready(Direction::Read, Notify::No)
306    }
307
308    /// Poll for events on the I/O resource's write readiness stream.
309    ///
310    /// If the I/O resource receives a new write readiness event since the last
311    /// call to `poll_write_ready`, it is returned. If it has not, the current
312    /// task is notified once a new event is received.
313    ///
314    /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
315    /// the function will always return `Ready(HUP)`. This should be treated as
316    /// the end of the readiness stream.
317    ///
318    /// Ensure that [`register`] has been called first.
319    ///
320    /// # Return value
321    ///
322    /// There are several possible return values:
323    ///
324    /// * `Ok(Async::Ready(readiness))` means that the I/O resource has received
325    ///   a new readiness event. The readiness value is included.
326    ///
327    /// * `Ok(NotReady)` means that no new readiness events have been received
328    ///   since the last call to `poll_write_ready`.
329    ///
330    /// * `Err(err)` means that the registration has encountered an error. This
331    ///   error either represents a permanent internal error **or** the fact
332    ///   that [`register`] was not called first.
333    ///
334    /// [`register`]: #method.register
335    /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
336    ///
337    /// # Panics
338    ///
339    /// This function will panic if called from outside of a task context.
340    pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
341        self.poll_ready(Direction::Write, Notify::Yes)
342            .map(|v| match v {
343                Some(v) => Async::Ready(v),
344                _ => Async::NotReady,
345            })
346    }
347
348    /// Consume any pending write readiness event.
349    ///
350    /// This function is identical to [`poll_write_ready`] **except** that it
351    /// will not notify the current task when a new event is received. As such,
352    /// it is safe to call this function from outside of a task context.
353    ///
354    /// [`poll_write_ready`]: #method.poll_write_ready
355    pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
356        self.poll_ready(Direction::Write, Notify::No)
357    }
358
359    fn poll_ready(&self, direction: Direction, notify: Notify) -> io::Result<Option<mio::Ready>> {
360        let mut state = self.state.load(SeqCst);
361
362        // Cache the node pointer
363        let mut node = None;
364
365        loop {
366            match state {
367                INIT => {
368                    return Err(io::Error::new(
369                        io::ErrorKind::Other,
370                        "must call `register`
371                                              before poll_read_ready",
372                    ));
373                }
374                READY => {
375                    let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
376                    return inner.poll_ready(direction, notify);
377                }
378                LOCKED => {
379                    if let Notify::No = notify {
380                        // Skip the notification tracking junk.
381                        return Ok(None);
382                    }
383
384                    let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node;
385
386                    let task = task::current();
387
388                    // Get the node
389                    let mut n = node.take().unwrap_or_else(|| {
390                        Box::new(Node {
391                            direction,
392                            task: task,
393                            next: ptr::null_mut(),
394                        })
395                    });
396
397                    n.next = next_ptr;
398
399                    let node_ptr = Box::into_raw(n);
400                    let next = node_ptr as usize | (state & LIFECYCLE_MASK);
401
402                    let actual = self.state.compare_and_swap(state, next, SeqCst);
403
404                    if actual != state {
405                        // Back out of the node boxing
406                        let n = unsafe { Box::from_raw(node_ptr) };
407
408                        // Save this for next loop
409                        node = Some(n);
410
411                        state = actual;
412                        continue;
413                    }
414
415                    return Ok(None);
416                }
417                _ => unreachable!(),
418            }
419        }
420    }
421}
422
423unsafe impl Send for Registration {}
424unsafe impl Sync for Registration {}
425
426// ===== impl Inner =====
427
428impl Inner {
429    fn new<T>(io: &T, handle: HandlePriv) -> (Self, io::Result<()>)
430    where
431        T: Evented,
432    {
433        let mut res = Ok(());
434
435        let token = match handle.inner() {
436            Some(inner) => match inner.add_source(io) {
437                Ok(token) => token,
438                Err(e) => {
439                    res = Err(e);
440                    ERROR
441                }
442            },
443            None => {
444                res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone"));
445                ERROR
446            }
447        };
448
449        let inner = Inner { handle, token };
450
451        (inner, res)
452    }
453
454    fn register(&self, direction: Direction, task: Task) {
455        if self.token == ERROR {
456            task.notify();
457            return;
458        }
459
460        let inner = match self.handle.inner() {
461            Some(inner) => inner,
462            None => {
463                task.notify();
464                return;
465            }
466        };
467
468        inner.register(self.token, direction, task);
469    }
470
471    fn deregister<E: Evented>(&self, io: &E) -> io::Result<()> {
472        if self.token == ERROR {
473            return Err(io::Error::new(
474                io::ErrorKind::Other,
475                "failed to associate with reactor",
476            ));
477        }
478
479        let inner = match self.handle.inner() {
480            Some(inner) => inner,
481            None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
482        };
483
484        inner.deregister_source(io)
485    }
486
487    fn poll_ready(&self, direction: Direction, notify: Notify) -> io::Result<Option<mio::Ready>> {
488        if self.token == ERROR {
489            return Err(io::Error::new(
490                io::ErrorKind::Other,
491                "failed to associate with reactor",
492            ));
493        }
494
495        let inner = match self.handle.inner() {
496            Some(inner) => inner,
497            None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
498        };
499
500        let mask = direction.mask();
501        let mask_no_hup = (mask - ::platform::hup()).as_usize();
502
503        let io_dispatch = inner.io_dispatch.read();
504        let sched = &io_dispatch[self.token];
505
506        // This consumes the current readiness state **except** for HUP. HUP is
507        // excluded because a) it is a final state and never transitions out of
508        // HUP and b) both the read AND the write directions need to be able to
509        // observe this state.
510        //
511        // If HUP were to be cleared when `direction` is `Read`, then when
512        // `poll_ready` is called again with a _`direction` of `Write`, the HUP
513        // state would not be visible.
514        let mut ready =
515            mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst));
516
517        if ready.is_empty() && notify == Notify::Yes {
518            debug!("scheduling {:?} for: {}", direction, self.token);
519            // Update the task info
520            match direction {
521                Direction::Read => sched.reader.register(),
522                Direction::Write => sched.writer.register(),
523            }
524
525            // Try again
526            ready = mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst));
527        }
528
529        if ready.is_empty() {
530            Ok(None)
531        } else {
532            Ok(Some(ready))
533        }
534    }
535}
536
537impl Drop for Inner {
538    fn drop(&mut self) {
539        if self.token == ERROR {
540            return;
541        }
542
543        let inner = match self.handle.inner() {
544            Some(inner) => inner,
545            None => return,
546        };
547
548        inner.drop_source(self.token);
549    }
550}