tokio_threadpool/worker/mod.rs
1mod entry;
2mod stack;
3mod state;
4
5pub(crate) use self::entry::WorkerEntry as Entry;
6pub(crate) use self::stack::Stack;
7pub(crate) use self::state::{Lifecycle, State};
8
9use notifier::Notifier;
10use pool::{self, BackupId, Pool};
11use sender::Sender;
12use shutdown::ShutdownTrigger;
13use task::{self, CanBlock, Task};
14
15use tokio_executor;
16
17use futures::{Async, Poll};
18
19use std::cell::Cell;
20use std::marker::PhantomData;
21use std::rc::Rc;
22use std::sync::atomic::Ordering::{AcqRel, Acquire};
23use std::sync::Arc;
24use std::thread;
25use std::time::Duration;
26
27/// Thread worker
28///
29/// This is passed to the [`around_worker`] callback set on [`Builder`]. This
30/// callback is only expected to call [`run`] on it.
31///
32/// [`Builder`]: struct.Builder.html
33/// [`around_worker`]: struct.Builder.html#method.around_worker
34/// [`run`]: struct.Worker.html#method.run
35#[derive(Debug)]
36pub struct Worker {
37 // Shared scheduler data
38 pub(crate) pool: Arc<Pool>,
39
40 // WorkerEntry index
41 pub(crate) id: WorkerId,
42
43 // Backup thread ID assigned to processing this worker.
44 backup_id: BackupId,
45
46 // Set to the task that is currently being polled by the worker. This is
47 // needed so that `blocking` blocks are able to interact with this task.
48 //
49 // This has to be a raw pointer to make it compile, but great care is taken
50 // when this is set.
51 current_task: CurrentTask,
52
53 // Set when the thread is in blocking mode.
54 is_blocking: Cell<bool>,
55
56 // Set when the worker should finalize on drop
57 should_finalize: Cell<bool>,
58
59 // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
60 trigger: Arc<ShutdownTrigger>,
61
62 // Keep the value on the current thread.
63 _p: PhantomData<Rc<()>>,
64}
65
66/// Tracks the state related to the currently running task.
67#[derive(Debug)]
68struct CurrentTask {
69 /// This has to be a raw pointer to make it compile, but great care is taken
70 /// when this is set.
71 task: Cell<Option<*const Arc<Task>>>,
72
73 /// Tracks the blocking capacity allocation state.
74 can_block: Cell<CanBlock>,
75}
76
77/// Identifies a thread pool worker.
78///
79/// This identifier is unique scoped by the thread pool. It is possible that
80/// different thread pool instances share worker identifier values.
81#[derive(Debug, Clone, Hash, Eq, PartialEq)]
82pub struct WorkerId(pub(crate) usize);
83
84// Pointer to the current worker info
85thread_local!(static CURRENT_WORKER: Cell<*const Worker> = Cell::new(0 as *const _));
86
87impl Worker {
88 pub(crate) fn new(
89 id: WorkerId,
90 backup_id: BackupId,
91 pool: Arc<Pool>,
92 trigger: Arc<ShutdownTrigger>,
93 ) -> Worker {
94 Worker {
95 pool,
96 id,
97 backup_id,
98 current_task: CurrentTask::new(),
99 is_blocking: Cell::new(false),
100 should_finalize: Cell::new(false),
101 trigger,
102 _p: PhantomData,
103 }
104 }
105
106 pub(crate) fn is_blocking(&self) -> bool {
107 self.is_blocking.get()
108 }
109
110 /// Run the worker
111 ///
112 /// Returns `true` if the thread should keep running as a `backup` thread.
113 pub(crate) fn do_run(&self) -> bool {
114 // Create another worker... It's ok, this is just a new type around
115 // `Pool` that is expected to stay on the current thread.
116 CURRENT_WORKER.with(|c| {
117 c.set(self as *const _);
118
119 let pool = self.pool.clone();
120 let mut sender = Sender { pool };
121
122 // Enter an execution context
123 let mut enter = tokio_executor::enter().unwrap();
124
125 tokio_executor::with_default(&mut sender, &mut enter, |enter| {
126 if let Some(ref callback) = self.pool.config.around_worker {
127 callback.call(self, enter);
128 } else {
129 self.run();
130 }
131 });
132 });
133
134 // Can't be in blocking mode and finalization mode
135 debug_assert!(!self.is_blocking.get() || !self.should_finalize.get());
136
137 self.is_blocking.get()
138 }
139
140 pub(crate) fn with_current<F: FnOnce(Option<&Worker>) -> R, R>(f: F) -> R {
141 CURRENT_WORKER.with(move |c| {
142 let ptr = c.get();
143
144 if ptr.is_null() {
145 f(None)
146 } else {
147 f(Some(unsafe { &*ptr }))
148 }
149 })
150 }
151
152 /// Transition the current worker to a blocking worker
153 pub(crate) fn transition_to_blocking(&self) -> Poll<(), ::BlockingError> {
154 use self::CanBlock::*;
155
156 // If we get this far, then `current_task` has been set.
157 let task_ref = self.current_task.get_ref();
158
159 // First step is to acquire blocking capacity for the task.
160 match self.current_task.can_block() {
161 // Capacity to block has already been allocated to this task.
162 Allocated => {}
163
164 // The task has already requested capacity to block, but there is
165 // none yet available.
166 NoCapacity => return Ok(Async::NotReady),
167
168 // The task has yet to ask for capacity
169 CanRequest => {
170 // Atomically attempt to acquire blocking capacity, and if none
171 // is available, register the task to be notified once capacity
172 // becomes available.
173 match self.pool.poll_blocking_capacity(task_ref)? {
174 Async::Ready(()) => {
175 self.current_task.set_can_block(Allocated);
176 }
177 Async::NotReady => {
178 self.current_task.set_can_block(NoCapacity);
179 return Ok(Async::NotReady);
180 }
181 }
182 }
183 }
184
185 // The task has been allocated blocking capacity. At this point, this is
186 // when the current thread transitions from a worker to a backup thread.
187 // To do so requires handing over the worker to another backup thread.
188
189 if self.is_blocking.get() {
190 // The thread is already in blocking mode, so there is nothing else
191 // to do. Return `Ready` and allow the caller to block the thread.
192 return Ok(().into());
193 }
194
195 trace!("transition to blocking state");
196
197 // Transitioning to blocking requires handing over the worker state to
198 // another thread so that the work queue can continue to be processed.
199
200 self.pool.spawn_thread(self.id.clone(), &self.pool);
201
202 // Track that the thread has now fully entered the blocking state.
203 self.is_blocking.set(true);
204
205 Ok(().into())
206 }
207
208 /// Transition from blocking
209 pub(crate) fn transition_from_blocking(&self) {
210 // TODO: Attempt to take ownership of the worker again.
211 }
212
213 /// Returns a reference to the worker's identifier.
214 ///
215 /// This identifier is unique scoped by the thread pool. It is possible that
216 /// different thread pool instances share worker identifier values.
217 pub fn id(&self) -> &WorkerId {
218 &self.id
219 }
220
221 /// Run the worker
222 ///
223 /// This function blocks until the worker is shutting down.
224 pub fn run(&self) {
225 const MAX_SPINS: usize = 3;
226 const LIGHT_SLEEP_INTERVAL: usize = 32;
227
228 // Get the notifier.
229 let notify = Arc::new(Notifier {
230 pool: self.pool.clone(),
231 });
232
233 let mut first = true;
234 let mut spin_cnt = 0;
235 let mut tick = 0;
236
237 while self.check_run_state(first) {
238 first = false;
239
240 // Run the next available task
241 if self.try_run_task(¬ify) {
242 if self.is_blocking.get() {
243 // Exit out of the run state
244 return;
245 }
246
247 // Poll the reactor and the global queue every now and then to
248 // ensure no task gets left behind.
249 if tick % LIGHT_SLEEP_INTERVAL == 0 {
250 self.sleep_light();
251 }
252
253 tick = tick.wrapping_add(1);
254 spin_cnt = 0;
255
256 // As long as there is work, keep looping.
257 continue;
258 }
259
260 spin_cnt += 1;
261
262 // Yield the thread several times before it actually goes to sleep.
263 if spin_cnt <= MAX_SPINS {
264 thread::yield_now();
265 continue;
266 }
267
268 tick = 0;
269 spin_cnt = 0;
270
271 // Starting to get sleeeeepy
272 if !self.sleep() {
273 return;
274 }
275
276 // If there still isn't any work to do, shutdown the worker?
277 }
278
279 // The pool is terminating. However, transitioning the pool state to
280 // terminated is the very first step of the finalization process. Other
281 // threads may not see this state and try to spawn a new thread. To
282 // ensure consistency, before the current thread shuts down, it must
283 // return the backup token to the stack.
284 //
285 // The returned result is ignored because `Err` represents the pool
286 // shutting down. We are currently aware of this fact.
287 let _ = self.pool.release_backup(self.backup_id);
288
289 self.should_finalize.set(true);
290 }
291
292 /// Try to run a task
293 ///
294 /// Returns `true` if work was found.
295 #[inline]
296 fn try_run_task(&self, notify: &Arc<Notifier>) -> bool {
297 if self.try_run_owned_task(notify) {
298 return true;
299 }
300
301 self.try_steal_task(notify)
302 }
303
304 /// Checks the worker's current state, updating it as needed.
305 ///
306 /// Returns `true` if the worker should run.
307 #[inline]
308 fn check_run_state(&self, first: bool) -> bool {
309 use self::Lifecycle::*;
310
311 debug_assert!(!self.is_blocking.get());
312
313 let mut state: State = self.entry().state.load(Acquire).into();
314
315 loop {
316 let pool_state: pool::State = self.pool.state.load(Acquire).into();
317
318 if pool_state.is_terminated() {
319 return false;
320 }
321
322 let mut next = state;
323
324 match state.lifecycle() {
325 Running => break,
326 Notified | Signaled => {
327 // transition back to running
328 next.set_lifecycle(Running);
329 }
330 Shutdown | Sleeping => {
331 // The worker should never be in these states when calling
332 // this function.
333 panic!("unexpected worker state; lifecycle={:?}", state.lifecycle());
334 }
335 }
336
337 let actual = self
338 .entry()
339 .state
340 .compare_and_swap(state.into(), next.into(), AcqRel)
341 .into();
342
343 if actual == state {
344 break;
345 }
346
347 state = actual;
348 }
349
350 // `first` is set to true the first time this function is called after
351 // the thread has started.
352 //
353 // This check is to handle the scenario where a worker gets signaled
354 // while it is already happily running. The `is_signaled` state is
355 // intended to wake up a worker that has been previously sleeping in
356 // effect increasing the number of active workers. If this is the first
357 // time `check_run_state` is called, then being in a signalled state is
358 // normal and the thread was started to handle it. However, if this is
359 // **not** the first time the fn was called, then the number of active
360 // workers has not been increased by the signal, so `signal_work` has to
361 // be called again to try to wake up another worker.
362 //
363 // For example, if the thread pool is configured to allow 4 workers.
364 // Worker 1 is processing tasks from its `deque`. Worker 2 receives its
365 // first task. Worker 2 will pick a random worker to signal. It does
366 // this by popping off the sleep stack, but there is no guarantee that
367 // workers on the sleep stack are actually sleeping. It is possible that
368 // Worker 1 gets signaled.
369 //
370 // Without this check, in the above case, no additional workers will get
371 // started, which results in the thread pool permanently being at 2
372 // workers even though it should reach 4.
373 if !first && state.is_signaled() {
374 trace!("Worker::check_run_state; delegate signal");
375 // This worker is not ready to be signaled, so delegate the signal
376 // to another worker.
377 self.pool.signal_work(&self.pool);
378 }
379
380 true
381 }
382
383 /// Runs the next task on this worker's queue.
384 ///
385 /// Returns `true` if work was found.
386 fn try_run_owned_task(&self, notify: &Arc<Notifier>) -> bool {
387 // Poll the internal queue for a task to run
388 match self.entry().pop_task() {
389 Some(task) => {
390 self.run_task(task, notify);
391 true
392 }
393 None => false,
394 }
395 }
396
397 /// Tries to steal a task from another worker.
398 ///
399 /// Returns `true` if work was found
400 fn try_steal_task(&self, notify: &Arc<Notifier>) -> bool {
401 use crossbeam_deque::Steal;
402
403 debug_assert!(!self.is_blocking.get());
404
405 let len = self.pool.workers.len();
406 let mut idx = self.pool.rand_usize() % len;
407 let mut found_work = false;
408 let start = idx;
409
410 loop {
411 if idx < len {
412 match self.pool.workers[idx].steal_tasks(self.entry()) {
413 Steal::Success(task) => {
414 trace!("stole task from another worker");
415
416 self.run_task(task, notify);
417
418 trace!(
419 "try_steal_task -- signal_work; self={}; from={}",
420 self.id.0,
421 idx
422 );
423
424 // Signal other workers that work is available
425 //
426 // TODO: Should this be called here or before
427 // `run_task`?
428 self.pool.signal_work(&self.pool);
429
430 return true;
431 }
432 Steal::Empty => {}
433 Steal::Retry => found_work = true,
434 }
435
436 idx += 1;
437 } else {
438 idx = 0;
439 }
440
441 if idx == start {
442 break;
443 }
444 }
445
446 found_work
447 }
448
449 fn run_task(&self, task: Arc<Task>, notify: &Arc<Notifier>) {
450 use task::Run::*;
451
452 // If this is the first time this task is being polled, register it so that we can keep
453 // track of tasks that are in progress.
454 if task.reg_worker.get().is_none() {
455 task.reg_worker.set(Some(self.id.0 as u32));
456 self.entry().register_task(&task);
457 }
458
459 let run = self.run_task2(&task, notify);
460
461 // TODO: Try to claim back the worker state in case the backup thread
462 // did not start up fast enough. This is a performance optimization.
463
464 match run {
465 Idle => {}
466 Schedule => {
467 if self.is_blocking.get() {
468 // The future has been notified while it was running.
469 // However, the future also entered a blocking section,
470 // which released the worker state from this thread.
471 //
472 // This means that scheduling the future must be done from
473 // a point of view external to the worker set.
474 //
475 // We have to call `submit_external` instead of `submit`
476 // here because `self` is still set as the current worker.
477 self.pool.submit_external(task, &self.pool);
478 } else {
479 self.entry().push_internal(task);
480 }
481 }
482 Complete => {
483 let mut state: pool::State = self.pool.state.load(Acquire).into();
484
485 loop {
486 let mut next = state;
487 next.dec_num_futures();
488
489 let actual = self
490 .pool
491 .state
492 .compare_and_swap(state.into(), next.into(), AcqRel)
493 .into();
494
495 if actual == state {
496 trace!("task complete; state={:?}", next);
497
498 if state.num_futures() == 1 {
499 // If the thread pool has been flagged as shutdown,
500 // start terminating workers. This involves waking
501 // up any sleeping worker so that they can notice
502 // the shutdown state.
503 if next.is_terminated() {
504 self.pool.terminate_sleeping_workers();
505 }
506 }
507
508 // Find which worker polled this task first.
509 let worker = task.reg_worker.get().unwrap() as usize;
510
511 // Unregister the task from the worker it was registered in.
512 if !self.is_blocking.get() && worker == self.id.0 {
513 self.entry().unregister_task(task);
514 } else {
515 self.pool.workers[worker].remotely_complete_task(task);
516 }
517
518 // The worker's run loop will detect the shutdown state
519 // next iteration.
520 return;
521 }
522
523 state = actual;
524 }
525 }
526 }
527 }
528
529 /// Actually run the task. This is where `Worker::current_task` is set.
530 ///
531 /// Great care is needed to ensure that `current_task` is unset in this
532 /// function.
533 fn run_task2(&self, task: &Arc<Task>, notify: &Arc<Notifier>) -> task::Run {
534 struct Guard<'a> {
535 worker: &'a Worker,
536 }
537
538 impl<'a> Drop for Guard<'a> {
539 fn drop(&mut self) {
540 // A task is allocated at run when it was explicitly notified
541 // that the task has capacity to block. When this happens, that
542 // capacity is automatically allocated to the notified task.
543 // This capacity is "use it or lose it", so if the thread is not
544 // transitioned to blocking in this call, then another task has
545 // to be notified.
546 //
547 // If the task has consumed its blocking allocation but hasn't
548 // used it, it must be given to some other task instead.
549 if !self.worker.is_blocking.get() {
550 let can_block = self.worker.current_task.can_block();
551 if can_block == CanBlock::Allocated {
552 self.worker.pool.notify_blocking_task(&self.worker.pool);
553 }
554 }
555
556 self.worker.current_task.clear();
557 }
558 }
559
560 // Set `current_task`
561 self.current_task.set(task, CanBlock::CanRequest);
562
563 // Create the guard, this ensures that `current_task` is unset when the
564 // function returns, even if the return is caused by a panic.
565 let _g = Guard { worker: self };
566
567 task.run(notify)
568 }
569
570 /// Put the worker to sleep
571 ///
572 /// Returns `true` if woken up due to new work arriving.
573 fn sleep(&self) -> bool {
574 use self::Lifecycle::*;
575
576 // Putting a worker to sleep is a multipart operation. This is, in part,
577 // due to the fact that a worker can be notified without it being popped
578 // from the sleep stack. Extra care is needed to deal with this.
579
580 trace!("Worker::sleep; worker={:?}", self.id);
581
582 let mut state: State = self.entry().state.load(Acquire).into();
583
584 // The first part of the sleep process is to transition the worker state
585 // to "pushed". Now, it may be that the worker is already pushed on the
586 // sleeper stack, in which case, we don't push again.
587
588 loop {
589 let mut next = state;
590
591 match state.lifecycle() {
592 Running => {
593 // Try setting the pushed state
594 next.set_pushed();
595
596 // Transition the worker state to sleeping
597 next.set_lifecycle(Sleeping);
598 }
599 Notified | Signaled => {
600 // No need to sleep, transition back to running and move on.
601 next.set_lifecycle(Running);
602 }
603 Shutdown | Sleeping => {
604 // The worker cannot transition to sleep when already in a
605 // sleeping state.
606 panic!("unexpected worker state; actual={:?}", state.lifecycle());
607 }
608 }
609
610 let actual = self
611 .entry()
612 .state
613 .compare_and_swap(state.into(), next.into(), AcqRel)
614 .into();
615
616 if actual == state {
617 if state.is_notified() {
618 // The previous state was notified, so we don't need to
619 // sleep.
620 return true;
621 }
622
623 if !state.is_pushed() {
624 debug_assert!(next.is_pushed());
625
626 trace!(" sleeping -- push to stack; idx={}", self.id.0);
627
628 // We obtained permission to push the worker into the
629 // sleeper queue.
630 if let Err(_) = self.pool.push_sleeper(self.id.0) {
631 trace!(" sleeping -- push to stack failed; idx={}", self.id.0);
632 // The push failed due to the pool being terminated.
633 //
634 // This is true because the "work" being woken up for is
635 // shutting down.
636 return true;
637 }
638 }
639
640 break;
641 }
642
643 state = actual;
644 }
645
646 trace!(" -> starting to sleep; idx={}", self.id.0);
647
648 // Do a quick check to see if there are any notifications in the
649 // reactor or new tasks in the global queue. Since this call will
650 // clear the wakeup token, we need to check the state again and
651 // only after that go to sleep.
652 self.sleep_light();
653
654 // The state has been transitioned to sleeping, we can now wait by
655 // calling the parker. This is done in a loop as condvars can wakeup
656 // spuriously.
657 loop {
658 // Reload the state
659 state = self.entry().state.load(Acquire).into();
660
661 // If the worker has been notified, transition back to running.
662 match state.lifecycle() {
663 Sleeping => {
664 // Still sleeping. Park again.
665 }
666 Notified | Signaled => {
667 // Transition back to running
668 loop {
669 let mut next = state;
670 next.set_lifecycle(Running);
671
672 let actual = self
673 .entry()
674 .state
675 .compare_and_swap(state.into(), next.into(), AcqRel)
676 .into();
677
678 if actual == state {
679 return true;
680 }
681
682 state = actual;
683 }
684 }
685 Shutdown | Running => {
686 // To get here, the block above transitioned the state to
687 // `Sleeping`. No other thread can concurrently
688 // transition to `Shutdown` or `Running`.
689 unreachable!();
690 }
691 }
692
693 self.entry().park();
694
695 trace!(" -> wakeup; idx={}", self.id.0);
696 }
697 }
698
699 /// This doesn't actually put the thread to sleep. It calls
700 /// `park.park_timeout` with a duration of 0. This allows the park
701 /// implementation to perform any work that might be done on an interval.
702 ///
703 /// Returns `true` if this worker has tasks in its queue.
704 fn sleep_light(&self) {
705 self.entry().park_timeout(Duration::from_millis(0));
706
707 use crossbeam_deque::Steal;
708 loop {
709 match self.pool.queue.steal_batch(&self.entry().worker) {
710 Steal::Success(()) => {
711 self.pool.signal_work(&self.pool);
712 break;
713 }
714 Steal::Empty => break,
715 Steal::Retry => {}
716 }
717 }
718 }
719
720 fn entry(&self) -> &Entry {
721 debug_assert!(!self.is_blocking.get());
722 &self.pool.workers[self.id.0]
723 }
724}
725
726impl Drop for Worker {
727 fn drop(&mut self) {
728 trace!("shutting down thread; idx={}", self.id.0);
729
730 if self.should_finalize.get() {
731 // Drain the work queue
732 self.entry().drain_tasks();
733 }
734 }
735}
736
737// ===== impl CurrentTask =====
738
739impl CurrentTask {
740 /// Returns a default `CurrentTask` representing no task.
741 fn new() -> CurrentTask {
742 CurrentTask {
743 task: Cell::new(None),
744 can_block: Cell::new(CanBlock::CanRequest),
745 }
746 }
747
748 /// Returns a reference to the task.
749 fn get_ref(&self) -> &Arc<Task> {
750 unsafe { &*self.task.get().unwrap() }
751 }
752
753 fn can_block(&self) -> CanBlock {
754 use self::CanBlock::*;
755
756 match self.can_block.get() {
757 Allocated => Allocated,
758 CanRequest | NoCapacity => {
759 let can_block = self.get_ref().consume_blocking_allocation();
760 self.can_block.set(can_block);
761 can_block
762 }
763 }
764 }
765
766 fn set_can_block(&self, can_block: CanBlock) {
767 self.can_block.set(can_block);
768 }
769
770 fn set(&self, task: &Arc<Task>, can_block: CanBlock) {
771 self.task.set(Some(task as *const _));
772 self.can_block.set(can_block);
773 }
774
775 /// Reset the `CurrentTask` to null state.
776 fn clear(&self) {
777 self.task.set(None);
778 self.can_block.set(CanBlock::CanRequest);
779 }
780}
781
782// ===== impl WorkerId =====
783
784impl WorkerId {
785 /// Returns a `WorkerId` representing the worker entry at index `idx`.
786 pub(crate) fn new(idx: usize) -> WorkerId {
787 WorkerId(idx)
788 }
789
790 /// Returns this identifier represented as an integer.
791 ///
792 /// Worker identifiers in a single thread pool are guaranteed to correspond to integers in the
793 /// range `0..pool_size`.
794 pub fn to_usize(&self) -> usize {
795 self.0
796 }
797}