tokio_threadpool/sender.rs
1use pool::{self, Lifecycle, Pool, MAX_FUTURES};
2use task::Task;
3
4use std::sync::atomic::Ordering::{AcqRel, Acquire};
5use std::sync::Arc;
6
7use futures::{future, Future};
8use tokio_executor::{self, SpawnError};
9
10/// Submit futures to the associated thread pool for execution.
11///
12/// A `Sender` instance is a handle to a single thread pool, allowing the owner
13/// of the handle to spawn futures onto the thread pool. New futures are spawned
14/// using [`Sender::spawn`].
15///
16/// The `Sender` handle is *only* used for spawning new futures. It does not
17/// impact the lifecycle of the thread pool in any way.
18///
19/// `Sender` instances are obtained by calling [`ThreadPool::sender`]. The
20/// `Sender` struct implements the `Executor` trait.
21///
22/// [`Sender::spawn`]: #method.spawn
23/// [`ThreadPool::sender`]: struct.ThreadPool.html#method.sender
24#[derive(Debug)]
25pub struct Sender {
26 pub(crate) pool: Arc<Pool>,
27}
28
29impl Sender {
30 /// Spawn a future onto the thread pool
31 ///
32 /// This function takes ownership of the future and spawns it onto the
33 /// thread pool, assigning it to a worker thread. The exact strategy used to
34 /// assign a future to a worker depends on if the caller is already on a
35 /// worker thread or external to the thread pool.
36 ///
37 /// If the caller is currently on the thread pool, the spawned future will
38 /// be assigned to the same worker that the caller is on. If the caller is
39 /// external to the thread pool, the future will be assigned to a random
40 /// worker.
41 ///
42 /// If `spawn` returns `Ok`, this does not mean that the future will be
43 /// executed. The thread pool can be forcibly shutdown between the time
44 /// `spawn` is called and the future has a chance to execute.
45 ///
46 /// If `spawn` returns `Err`, then the future failed to be spawned. There
47 /// are two possible causes:
48 ///
49 /// * The thread pool is at capacity and is unable to spawn a new future.
50 /// This is a temporary failure. At some point in the future, the thread
51 /// pool might be able to spawn new futures.
52 /// * The thread pool is shutdown. This is a permanent failure indicating
53 /// that the handle will never be able to spawn new futures.
54 ///
55 /// The status of the thread pool can be queried before calling `spawn`
56 /// using the `status` function (part of the `Executor` trait).
57 ///
58 /// # Examples
59 ///
60 /// ```rust
61 /// # extern crate tokio_threadpool;
62 /// # extern crate futures;
63 /// # use tokio_threadpool::ThreadPool;
64 /// use futures::future::{Future, lazy};
65 ///
66 /// # pub fn main() {
67 /// // Create a thread pool with default configuration values
68 /// let thread_pool = ThreadPool::new();
69 ///
70 /// thread_pool.sender().spawn(lazy(|| {
71 /// println!("called from a worker thread");
72 /// Ok(())
73 /// })).unwrap();
74 ///
75 /// // Gracefully shutdown the threadpool
76 /// thread_pool.shutdown().wait().unwrap();
77 /// # }
78 /// ```
79 pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
80 where
81 F: Future<Item = (), Error = ()> + Send + 'static,
82 {
83 let mut s = self;
84 tokio_executor::Executor::spawn(&mut s, Box::new(future))
85 }
86
87 /// Logic to prepare for spawning
88 fn prepare_for_spawn(&self) -> Result<(), SpawnError> {
89 let mut state: pool::State = self.pool.state.load(Acquire).into();
90
91 // Increment the number of futures spawned on the pool as well as
92 // validate that the pool is still running/
93 loop {
94 let mut next = state;
95
96 if next.num_futures() == MAX_FUTURES {
97 // No capacity
98 return Err(SpawnError::at_capacity());
99 }
100
101 if next.lifecycle() == Lifecycle::ShutdownNow {
102 // Cannot execute the future, executor is shutdown.
103 return Err(SpawnError::shutdown());
104 }
105
106 next.inc_num_futures();
107
108 let actual = self
109 .pool
110 .state
111 .compare_and_swap(state.into(), next.into(), AcqRel)
112 .into();
113
114 if actual == state {
115 trace!("execute; count={:?}", next.num_futures());
116 break;
117 }
118
119 state = actual;
120 }
121
122 Ok(())
123 }
124}
125
126impl tokio_executor::Executor for Sender {
127 fn status(&self) -> Result<(), tokio_executor::SpawnError> {
128 let s = self;
129 tokio_executor::Executor::status(&s)
130 }
131
132 fn spawn(
133 &mut self,
134 future: Box<dyn Future<Item = (), Error = ()> + Send>,
135 ) -> Result<(), SpawnError> {
136 let mut s = &*self;
137 tokio_executor::Executor::spawn(&mut s, future)
138 }
139}
140
141impl<'a> tokio_executor::Executor for &'a Sender {
142 fn status(&self) -> Result<(), tokio_executor::SpawnError> {
143 let state: pool::State = self.pool.state.load(Acquire).into();
144
145 if state.num_futures() == MAX_FUTURES {
146 // No capacity
147 return Err(SpawnError::at_capacity());
148 }
149
150 if state.lifecycle() == Lifecycle::ShutdownNow {
151 // Cannot execute the future, executor is shutdown.
152 return Err(SpawnError::shutdown());
153 }
154
155 Ok(())
156 }
157
158 fn spawn(
159 &mut self,
160 future: Box<dyn Future<Item = (), Error = ()> + Send>,
161 ) -> Result<(), SpawnError> {
162 self.prepare_for_spawn()?;
163
164 // At this point, the pool has accepted the future, so schedule it for
165 // execution.
166
167 // Create a new task for the future
168 let task = Arc::new(Task::new(future));
169
170 // Call `submit_external()` in order to place the task into the global
171 // queue. This way all workers have equal chance of running this task,
172 // which means IO handles will be assigned to reactors more evenly.
173 self.pool.submit_external(task, &self.pool);
174
175 Ok(())
176 }
177}
178
179impl<T> tokio_executor::TypedExecutor<T> for Sender
180where
181 T: Future<Item = (), Error = ()> + Send + 'static,
182{
183 fn status(&self) -> Result<(), tokio_executor::SpawnError> {
184 tokio_executor::Executor::status(self)
185 }
186
187 fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
188 tokio_executor::Executor::spawn(self, Box::new(future))
189 }
190}
191
192impl<T> future::Executor<T> for Sender
193where
194 T: Future<Item = (), Error = ()> + Send + 'static,
195{
196 fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
197 if let Err(e) = tokio_executor::Executor::status(self) {
198 let kind = if e.is_at_capacity() {
199 future::ExecuteErrorKind::NoCapacity
200 } else {
201 future::ExecuteErrorKind::Shutdown
202 };
203
204 return Err(future::ExecuteError::new(kind, future));
205 }
206
207 let _ = self.spawn(future);
208 Ok(())
209 }
210}
211
212impl Clone for Sender {
213 #[inline]
214 fn clone(&self) -> Sender {
215 let pool = self.pool.clone();
216 Sender { pool }
217 }
218}