tokio_threadpool/worker/
mod.rs

1mod entry;
2mod stack;
3mod state;
4
5pub(crate) use self::entry::WorkerEntry as Entry;
6pub(crate) use self::stack::Stack;
7pub(crate) use self::state::{Lifecycle, State};
8
9use notifier::Notifier;
10use pool::{self, BackupId, Pool};
11use sender::Sender;
12use shutdown::ShutdownTrigger;
13use task::{self, CanBlock, Task};
14
15use tokio_executor;
16
17use futures::{Async, Poll};
18
19use std::cell::Cell;
20use std::marker::PhantomData;
21use std::rc::Rc;
22use std::sync::atomic::Ordering::{AcqRel, Acquire};
23use std::sync::Arc;
24use std::thread;
25use std::time::Duration;
26
27/// Thread worker
28///
29/// This is passed to the [`around_worker`] callback set on [`Builder`]. This
30/// callback is only expected to call [`run`] on it.
31///
32/// [`Builder`]: struct.Builder.html
33/// [`around_worker`]: struct.Builder.html#method.around_worker
34/// [`run`]: struct.Worker.html#method.run
35#[derive(Debug)]
36pub struct Worker {
37    // Shared scheduler data
38    pub(crate) pool: Arc<Pool>,
39
40    // WorkerEntry index
41    pub(crate) id: WorkerId,
42
43    // Backup thread ID assigned to processing this worker.
44    backup_id: BackupId,
45
46    // Set to the task that is currently being polled by the worker. This is
47    // needed so that `blocking` blocks are able to interact with this task.
48    //
49    // This has to be a raw pointer to make it compile, but great care is taken
50    // when this is set.
51    current_task: CurrentTask,
52
53    // Set when the thread is in blocking mode.
54    is_blocking: Cell<bool>,
55
56    // Set when the worker should finalize on drop
57    should_finalize: Cell<bool>,
58
59    // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
60    trigger: Arc<ShutdownTrigger>,
61
62    // Keep the value on the current thread.
63    _p: PhantomData<Rc<()>>,
64}
65
66/// Tracks the state related to the currently running task.
67#[derive(Debug)]
68struct CurrentTask {
69    /// This has to be a raw pointer to make it compile, but great care is taken
70    /// when this is set.
71    task: Cell<Option<*const Arc<Task>>>,
72
73    /// Tracks the blocking capacity allocation state.
74    can_block: Cell<CanBlock>,
75}
76
77/// Identifies a thread pool worker.
78///
79/// This identifier is unique scoped by the thread pool. It is possible that
80/// different thread pool instances share worker identifier values.
81#[derive(Debug, Clone, Hash, Eq, PartialEq)]
82pub struct WorkerId(pub(crate) usize);
83
84// Pointer to the current worker info
85thread_local!(static CURRENT_WORKER: Cell<*const Worker> = Cell::new(0 as *const _));
86
87impl Worker {
88    pub(crate) fn new(
89        id: WorkerId,
90        backup_id: BackupId,
91        pool: Arc<Pool>,
92        trigger: Arc<ShutdownTrigger>,
93    ) -> Worker {
94        Worker {
95            pool,
96            id,
97            backup_id,
98            current_task: CurrentTask::new(),
99            is_blocking: Cell::new(false),
100            should_finalize: Cell::new(false),
101            trigger,
102            _p: PhantomData,
103        }
104    }
105
106    pub(crate) fn is_blocking(&self) -> bool {
107        self.is_blocking.get()
108    }
109
110    /// Run the worker
111    ///
112    /// Returns `true` if the thread should keep running as a `backup` thread.
113    pub(crate) fn do_run(&self) -> bool {
114        // Create another worker... It's ok, this is just a new type around
115        // `Pool` that is expected to stay on the current thread.
116        CURRENT_WORKER.with(|c| {
117            c.set(self as *const _);
118
119            let pool = self.pool.clone();
120            let mut sender = Sender { pool };
121
122            // Enter an execution context
123            let mut enter = tokio_executor::enter().unwrap();
124
125            tokio_executor::with_default(&mut sender, &mut enter, |enter| {
126                if let Some(ref callback) = self.pool.config.around_worker {
127                    callback.call(self, enter);
128                } else {
129                    self.run();
130                }
131            });
132        });
133
134        // Can't be in blocking mode and finalization mode
135        debug_assert!(!self.is_blocking.get() || !self.should_finalize.get());
136
137        self.is_blocking.get()
138    }
139
140    pub(crate) fn with_current<F: FnOnce(Option<&Worker>) -> R, R>(f: F) -> R {
141        CURRENT_WORKER.with(move |c| {
142            let ptr = c.get();
143
144            if ptr.is_null() {
145                f(None)
146            } else {
147                f(Some(unsafe { &*ptr }))
148            }
149        })
150    }
151
152    /// Transition the current worker to a blocking worker
153    pub(crate) fn transition_to_blocking(&self) -> Poll<(), ::BlockingError> {
154        use self::CanBlock::*;
155
156        // If we get this far, then `current_task` has been set.
157        let task_ref = self.current_task.get_ref();
158
159        // First step is to acquire blocking capacity for the task.
160        match self.current_task.can_block() {
161            // Capacity to block has already been allocated to this task.
162            Allocated => {}
163
164            // The task has already requested capacity to block, but there is
165            // none yet available.
166            NoCapacity => return Ok(Async::NotReady),
167
168            // The task has yet to ask for capacity
169            CanRequest => {
170                // Atomically attempt to acquire blocking capacity, and if none
171                // is available, register the task to be notified once capacity
172                // becomes available.
173                match self.pool.poll_blocking_capacity(task_ref)? {
174                    Async::Ready(()) => {
175                        self.current_task.set_can_block(Allocated);
176                    }
177                    Async::NotReady => {
178                        self.current_task.set_can_block(NoCapacity);
179                        return Ok(Async::NotReady);
180                    }
181                }
182            }
183        }
184
185        // The task has been allocated blocking capacity. At this point, this is
186        // when the current thread transitions from a worker to a backup thread.
187        // To do so requires handing over the worker to another backup thread.
188
189        if self.is_blocking.get() {
190            // The thread is already in blocking mode, so there is nothing else
191            // to do. Return `Ready` and allow the caller to block the thread.
192            return Ok(().into());
193        }
194
195        trace!("transition to blocking state");
196
197        // Transitioning to blocking requires handing over the worker state to
198        // another thread so that the work queue can continue to be processed.
199
200        self.pool.spawn_thread(self.id.clone(), &self.pool);
201
202        // Track that the thread has now fully entered the blocking state.
203        self.is_blocking.set(true);
204
205        Ok(().into())
206    }
207
208    /// Transition from blocking
209    pub(crate) fn transition_from_blocking(&self) {
210        // TODO: Attempt to take ownership of the worker again.
211    }
212
213    /// Returns a reference to the worker's identifier.
214    ///
215    /// This identifier is unique scoped by the thread pool. It is possible that
216    /// different thread pool instances share worker identifier values.
217    pub fn id(&self) -> &WorkerId {
218        &self.id
219    }
220
221    /// Run the worker
222    ///
223    /// This function blocks until the worker is shutting down.
224    pub fn run(&self) {
225        const MAX_SPINS: usize = 3;
226        const LIGHT_SLEEP_INTERVAL: usize = 32;
227
228        // Get the notifier.
229        let notify = Arc::new(Notifier {
230            pool: self.pool.clone(),
231        });
232
233        let mut first = true;
234        let mut spin_cnt = 0;
235        let mut tick = 0;
236
237        while self.check_run_state(first) {
238            first = false;
239
240            // Run the next available task
241            if self.try_run_task(&notify) {
242                if self.is_blocking.get() {
243                    // Exit out of the run state
244                    return;
245                }
246
247                // Poll the reactor and the global queue every now and then to
248                // ensure no task gets left behind.
249                if tick % LIGHT_SLEEP_INTERVAL == 0 {
250                    self.sleep_light();
251                }
252
253                tick = tick.wrapping_add(1);
254                spin_cnt = 0;
255
256                // As long as there is work, keep looping.
257                continue;
258            }
259
260            spin_cnt += 1;
261
262            // Yield the thread several times before it actually goes to sleep.
263            if spin_cnt <= MAX_SPINS {
264                thread::yield_now();
265                continue;
266            }
267
268            tick = 0;
269            spin_cnt = 0;
270
271            // Starting to get sleeeeepy
272            if !self.sleep() {
273                return;
274            }
275
276            // If there still isn't any work to do, shutdown the worker?
277        }
278
279        // The pool is terminating. However, transitioning the pool state to
280        // terminated is the very first step of the finalization process. Other
281        // threads may not see this state and try to spawn a new thread. To
282        // ensure consistency, before the current thread shuts down, it must
283        // return the backup token to the stack.
284        //
285        // The returned result is ignored because `Err` represents the pool
286        // shutting down. We are currently aware of this fact.
287        let _ = self.pool.release_backup(self.backup_id);
288
289        self.should_finalize.set(true);
290    }
291
292    /// Try to run a task
293    ///
294    /// Returns `true` if work was found.
295    #[inline]
296    fn try_run_task(&self, notify: &Arc<Notifier>) -> bool {
297        if self.try_run_owned_task(notify) {
298            return true;
299        }
300
301        self.try_steal_task(notify)
302    }
303
304    /// Checks the worker's current state, updating it as needed.
305    ///
306    /// Returns `true` if the worker should run.
307    #[inline]
308    fn check_run_state(&self, first: bool) -> bool {
309        use self::Lifecycle::*;
310
311        debug_assert!(!self.is_blocking.get());
312
313        let mut state: State = self.entry().state.load(Acquire).into();
314
315        loop {
316            let pool_state: pool::State = self.pool.state.load(Acquire).into();
317
318            if pool_state.is_terminated() {
319                return false;
320            }
321
322            let mut next = state;
323
324            match state.lifecycle() {
325                Running => break,
326                Notified | Signaled => {
327                    // transition back to running
328                    next.set_lifecycle(Running);
329                }
330                Shutdown | Sleeping => {
331                    // The worker should never be in these states when calling
332                    // this function.
333                    panic!("unexpected worker state; lifecycle={:?}", state.lifecycle());
334                }
335            }
336
337            let actual = self
338                .entry()
339                .state
340                .compare_and_swap(state.into(), next.into(), AcqRel)
341                .into();
342
343            if actual == state {
344                break;
345            }
346
347            state = actual;
348        }
349
350        // `first` is set to true the first time this function is called after
351        // the thread has started.
352        //
353        // This check is to handle the scenario where a worker gets signaled
354        // while it is already happily running. The `is_signaled` state is
355        // intended to wake up a worker that has been previously sleeping in
356        // effect increasing the number of active workers. If this is the first
357        // time `check_run_state` is called, then being in a signalled state is
358        // normal and the thread was started to handle it.  However, if this is
359        // **not** the first time the fn was called, then the number of active
360        // workers has not been increased by the signal, so `signal_work` has to
361        // be called again to try to wake up another worker.
362        //
363        // For example, if the thread pool is configured to allow 4 workers.
364        // Worker 1 is processing tasks from its `deque`. Worker 2 receives its
365        // first task. Worker 2 will pick a random worker to signal. It does
366        // this by popping off the sleep stack, but there is no guarantee that
367        // workers on the sleep stack are actually sleeping. It is possible that
368        // Worker 1 gets signaled.
369        //
370        // Without this check, in the above case, no additional workers will get
371        // started, which results in the thread pool permanently being at 2
372        // workers even though it should reach 4.
373        if !first && state.is_signaled() {
374            trace!("Worker::check_run_state; delegate signal");
375            // This worker is not ready to be signaled, so delegate the signal
376            // to another worker.
377            self.pool.signal_work(&self.pool);
378        }
379
380        true
381    }
382
383    /// Runs the next task on this worker's queue.
384    ///
385    /// Returns `true` if work was found.
386    fn try_run_owned_task(&self, notify: &Arc<Notifier>) -> bool {
387        // Poll the internal queue for a task to run
388        match self.entry().pop_task() {
389            Some(task) => {
390                self.run_task(task, notify);
391                true
392            }
393            None => false,
394        }
395    }
396
397    /// Tries to steal a task from another worker.
398    ///
399    /// Returns `true` if work was found
400    fn try_steal_task(&self, notify: &Arc<Notifier>) -> bool {
401        use crossbeam_deque::Steal;
402
403        debug_assert!(!self.is_blocking.get());
404
405        let len = self.pool.workers.len();
406        let mut idx = self.pool.rand_usize() % len;
407        let mut found_work = false;
408        let start = idx;
409
410        loop {
411            if idx < len {
412                match self.pool.workers[idx].steal_tasks(self.entry()) {
413                    Steal::Success(task) => {
414                        trace!("stole task from another worker");
415
416                        self.run_task(task, notify);
417
418                        trace!(
419                            "try_steal_task -- signal_work; self={}; from={}",
420                            self.id.0,
421                            idx
422                        );
423
424                        // Signal other workers that work is available
425                        //
426                        // TODO: Should this be called here or before
427                        // `run_task`?
428                        self.pool.signal_work(&self.pool);
429
430                        return true;
431                    }
432                    Steal::Empty => {}
433                    Steal::Retry => found_work = true,
434                }
435
436                idx += 1;
437            } else {
438                idx = 0;
439            }
440
441            if idx == start {
442                break;
443            }
444        }
445
446        found_work
447    }
448
449    fn run_task(&self, task: Arc<Task>, notify: &Arc<Notifier>) {
450        use task::Run::*;
451
452        // If this is the first time this task is being polled, register it so that we can keep
453        // track of tasks that are in progress.
454        if task.reg_worker.get().is_none() {
455            task.reg_worker.set(Some(self.id.0 as u32));
456            self.entry().register_task(&task);
457        }
458
459        let run = self.run_task2(&task, notify);
460
461        // TODO: Try to claim back the worker state in case the backup thread
462        // did not start up fast enough. This is a performance optimization.
463
464        match run {
465            Idle => {}
466            Schedule => {
467                if self.is_blocking.get() {
468                    // The future has been notified while it was running.
469                    // However, the future also entered a blocking section,
470                    // which released the worker state from this thread.
471                    //
472                    // This means that scheduling the future must be done from
473                    // a point of view external to the worker set.
474                    //
475                    // We have to call `submit_external` instead of `submit`
476                    // here because `self` is still set as the current worker.
477                    self.pool.submit_external(task, &self.pool);
478                } else {
479                    self.entry().push_internal(task);
480                }
481            }
482            Complete => {
483                let mut state: pool::State = self.pool.state.load(Acquire).into();
484
485                loop {
486                    let mut next = state;
487                    next.dec_num_futures();
488
489                    let actual = self
490                        .pool
491                        .state
492                        .compare_and_swap(state.into(), next.into(), AcqRel)
493                        .into();
494
495                    if actual == state {
496                        trace!("task complete; state={:?}", next);
497
498                        if state.num_futures() == 1 {
499                            // If the thread pool has been flagged as shutdown,
500                            // start terminating workers. This involves waking
501                            // up any sleeping worker so that they can notice
502                            // the shutdown state.
503                            if next.is_terminated() {
504                                self.pool.terminate_sleeping_workers();
505                            }
506                        }
507
508                        // Find which worker polled this task first.
509                        let worker = task.reg_worker.get().unwrap() as usize;
510
511                        // Unregister the task from the worker it was registered in.
512                        if !self.is_blocking.get() && worker == self.id.0 {
513                            self.entry().unregister_task(task);
514                        } else {
515                            self.pool.workers[worker].remotely_complete_task(task);
516                        }
517
518                        // The worker's run loop will detect the shutdown state
519                        // next iteration.
520                        return;
521                    }
522
523                    state = actual;
524                }
525            }
526        }
527    }
528
529    /// Actually run the task. This is where `Worker::current_task` is set.
530    ///
531    /// Great care is needed to ensure that `current_task` is unset in this
532    /// function.
533    fn run_task2(&self, task: &Arc<Task>, notify: &Arc<Notifier>) -> task::Run {
534        struct Guard<'a> {
535            worker: &'a Worker,
536        }
537
538        impl<'a> Drop for Guard<'a> {
539            fn drop(&mut self) {
540                // A task is allocated at run when it was explicitly notified
541                // that the task has capacity to block. When this happens, that
542                // capacity is automatically allocated to the notified task.
543                // This capacity is "use it or lose it", so if the thread is not
544                // transitioned to blocking in this call, then another task has
545                // to be notified.
546                //
547                // If the task has consumed its blocking allocation but hasn't
548                // used it, it must be given to some other task instead.
549                if !self.worker.is_blocking.get() {
550                    let can_block = self.worker.current_task.can_block();
551                    if can_block == CanBlock::Allocated {
552                        self.worker.pool.notify_blocking_task(&self.worker.pool);
553                    }
554                }
555
556                self.worker.current_task.clear();
557            }
558        }
559
560        // Set `current_task`
561        self.current_task.set(task, CanBlock::CanRequest);
562
563        // Create the guard, this ensures that `current_task` is unset when the
564        // function returns, even if the return is caused by a panic.
565        let _g = Guard { worker: self };
566
567        task.run(notify)
568    }
569
570    /// Put the worker to sleep
571    ///
572    /// Returns `true` if woken up due to new work arriving.
573    fn sleep(&self) -> bool {
574        use self::Lifecycle::*;
575
576        // Putting a worker to sleep is a multipart operation. This is, in part,
577        // due to the fact that a worker can be notified without it being popped
578        // from the sleep stack. Extra care is needed to deal with this.
579
580        trace!("Worker::sleep; worker={:?}", self.id);
581
582        let mut state: State = self.entry().state.load(Acquire).into();
583
584        // The first part of the sleep process is to transition the worker state
585        // to "pushed". Now, it may be that the worker is already pushed on the
586        // sleeper stack, in which case, we don't push again.
587
588        loop {
589            let mut next = state;
590
591            match state.lifecycle() {
592                Running => {
593                    // Try setting the pushed state
594                    next.set_pushed();
595
596                    // Transition the worker state to sleeping
597                    next.set_lifecycle(Sleeping);
598                }
599                Notified | Signaled => {
600                    // No need to sleep, transition back to running and move on.
601                    next.set_lifecycle(Running);
602                }
603                Shutdown | Sleeping => {
604                    // The worker cannot transition to sleep when already in a
605                    // sleeping state.
606                    panic!("unexpected worker state; actual={:?}", state.lifecycle());
607                }
608            }
609
610            let actual = self
611                .entry()
612                .state
613                .compare_and_swap(state.into(), next.into(), AcqRel)
614                .into();
615
616            if actual == state {
617                if state.is_notified() {
618                    // The previous state was notified, so we don't need to
619                    // sleep.
620                    return true;
621                }
622
623                if !state.is_pushed() {
624                    debug_assert!(next.is_pushed());
625
626                    trace!("  sleeping -- push to stack; idx={}", self.id.0);
627
628                    // We obtained permission to push the worker into the
629                    // sleeper queue.
630                    if let Err(_) = self.pool.push_sleeper(self.id.0) {
631                        trace!("  sleeping -- push to stack failed; idx={}", self.id.0);
632                        // The push failed due to the pool being terminated.
633                        //
634                        // This is true because the "work" being woken up for is
635                        // shutting down.
636                        return true;
637                    }
638                }
639
640                break;
641            }
642
643            state = actual;
644        }
645
646        trace!("    -> starting to sleep; idx={}", self.id.0);
647
648        // Do a quick check to see if there are any notifications in the
649        // reactor or new tasks in the global queue. Since this call will
650        // clear the wakeup token, we need to check the state again and
651        // only after that go to sleep.
652        self.sleep_light();
653
654        // The state has been transitioned to sleeping, we can now wait by
655        // calling the parker. This is done in a loop as condvars can wakeup
656        // spuriously.
657        loop {
658            // Reload the state
659            state = self.entry().state.load(Acquire).into();
660
661            // If the worker has been notified, transition back to running.
662            match state.lifecycle() {
663                Sleeping => {
664                    // Still sleeping. Park again.
665                }
666                Notified | Signaled => {
667                    // Transition back to running
668                    loop {
669                        let mut next = state;
670                        next.set_lifecycle(Running);
671
672                        let actual = self
673                            .entry()
674                            .state
675                            .compare_and_swap(state.into(), next.into(), AcqRel)
676                            .into();
677
678                        if actual == state {
679                            return true;
680                        }
681
682                        state = actual;
683                    }
684                }
685                Shutdown | Running => {
686                    // To get here, the block above transitioned the state to
687                    // `Sleeping`. No other thread can concurrently
688                    // transition to `Shutdown` or `Running`.
689                    unreachable!();
690                }
691            }
692
693            self.entry().park();
694
695            trace!("    -> wakeup; idx={}", self.id.0);
696        }
697    }
698
699    /// This doesn't actually put the thread to sleep. It calls
700    /// `park.park_timeout` with a duration of 0. This allows the park
701    /// implementation to perform any work that might be done on an interval.
702    ///
703    /// Returns `true` if this worker has tasks in its queue.
704    fn sleep_light(&self) {
705        self.entry().park_timeout(Duration::from_millis(0));
706
707        use crossbeam_deque::Steal;
708        loop {
709            match self.pool.queue.steal_batch(&self.entry().worker) {
710                Steal::Success(()) => {
711                    self.pool.signal_work(&self.pool);
712                    break;
713                }
714                Steal::Empty => break,
715                Steal::Retry => {}
716            }
717        }
718    }
719
720    fn entry(&self) -> &Entry {
721        debug_assert!(!self.is_blocking.get());
722        &self.pool.workers[self.id.0]
723    }
724}
725
726impl Drop for Worker {
727    fn drop(&mut self) {
728        trace!("shutting down thread; idx={}", self.id.0);
729
730        if self.should_finalize.get() {
731            // Drain the work queue
732            self.entry().drain_tasks();
733        }
734    }
735}
736
737// ===== impl CurrentTask =====
738
739impl CurrentTask {
740    /// Returns a default `CurrentTask` representing no task.
741    fn new() -> CurrentTask {
742        CurrentTask {
743            task: Cell::new(None),
744            can_block: Cell::new(CanBlock::CanRequest),
745        }
746    }
747
748    /// Returns a reference to the task.
749    fn get_ref(&self) -> &Arc<Task> {
750        unsafe { &*self.task.get().unwrap() }
751    }
752
753    fn can_block(&self) -> CanBlock {
754        use self::CanBlock::*;
755
756        match self.can_block.get() {
757            Allocated => Allocated,
758            CanRequest | NoCapacity => {
759                let can_block = self.get_ref().consume_blocking_allocation();
760                self.can_block.set(can_block);
761                can_block
762            }
763        }
764    }
765
766    fn set_can_block(&self, can_block: CanBlock) {
767        self.can_block.set(can_block);
768    }
769
770    fn set(&self, task: &Arc<Task>, can_block: CanBlock) {
771        self.task.set(Some(task as *const _));
772        self.can_block.set(can_block);
773    }
774
775    /// Reset the `CurrentTask` to null state.
776    fn clear(&self) {
777        self.task.set(None);
778        self.can_block.set(CanBlock::CanRequest);
779    }
780}
781
782// ===== impl WorkerId =====
783
784impl WorkerId {
785    /// Returns a `WorkerId` representing the worker entry at index `idx`.
786    pub(crate) fn new(idx: usize) -> WorkerId {
787        WorkerId(idx)
788    }
789
790    /// Returns this identifier represented as an integer.
791    ///
792    /// Worker identifiers in a single thread pool are guaranteed to correspond to integers in the
793    /// range `0..pool_size`.
794    pub fn to_usize(&self) -> usize {
795        self.0
796    }
797}