tokio_threadpool/
sender.rs

1use pool::{self, Lifecycle, Pool, MAX_FUTURES};
2use task::Task;
3
4use std::sync::atomic::Ordering::{AcqRel, Acquire};
5use std::sync::Arc;
6
7use futures::{future, Future};
8use tokio_executor::{self, SpawnError};
9
10/// Submit futures to the associated thread pool for execution.
11///
12/// A `Sender` instance is a handle to a single thread pool, allowing the owner
13/// of the handle to spawn futures onto the thread pool. New futures are spawned
14/// using [`Sender::spawn`].
15///
16/// The `Sender` handle is *only* used for spawning new futures. It does not
17/// impact the lifecycle of the thread pool in any way.
18///
19/// `Sender` instances are obtained by calling [`ThreadPool::sender`]. The
20/// `Sender` struct implements the `Executor` trait.
21///
22/// [`Sender::spawn`]: #method.spawn
23/// [`ThreadPool::sender`]: struct.ThreadPool.html#method.sender
24#[derive(Debug)]
25pub struct Sender {
26    pub(crate) pool: Arc<Pool>,
27}
28
29impl Sender {
30    /// Spawn a future onto the thread pool
31    ///
32    /// This function takes ownership of the future and spawns it onto the
33    /// thread pool, assigning it to a worker thread. The exact strategy used to
34    /// assign a future to a worker depends on if the caller is already on a
35    /// worker thread or external to the thread pool.
36    ///
37    /// If the caller is currently on the thread pool, the spawned future will
38    /// be assigned to the same worker that the caller is on. If the caller is
39    /// external to the thread pool, the future will be assigned to a random
40    /// worker.
41    ///
42    /// If `spawn` returns `Ok`, this does not mean that the future will be
43    /// executed. The thread pool can be forcibly shutdown between the time
44    /// `spawn` is called and the future has a chance to execute.
45    ///
46    /// If `spawn` returns `Err`, then the future failed to be spawned. There
47    /// are two possible causes:
48    ///
49    /// * The thread pool is at capacity and is unable to spawn a new future.
50    ///   This is a temporary failure. At some point in the future, the thread
51    ///   pool might be able to spawn new futures.
52    /// * The thread pool is shutdown. This is a permanent failure indicating
53    ///   that the handle will never be able to spawn new futures.
54    ///
55    /// The status of the thread pool can be queried before calling `spawn`
56    /// using the `status` function (part of the `Executor` trait).
57    ///
58    /// # Examples
59    ///
60    /// ```rust
61    /// # extern crate tokio_threadpool;
62    /// # extern crate futures;
63    /// # use tokio_threadpool::ThreadPool;
64    /// use futures::future::{Future, lazy};
65    ///
66    /// # pub fn main() {
67    /// // Create a thread pool with default configuration values
68    /// let thread_pool = ThreadPool::new();
69    ///
70    /// thread_pool.sender().spawn(lazy(|| {
71    ///     println!("called from a worker thread");
72    ///     Ok(())
73    /// })).unwrap();
74    ///
75    /// // Gracefully shutdown the threadpool
76    /// thread_pool.shutdown().wait().unwrap();
77    /// # }
78    /// ```
79    pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
80    where
81        F: Future<Item = (), Error = ()> + Send + 'static,
82    {
83        let mut s = self;
84        tokio_executor::Executor::spawn(&mut s, Box::new(future))
85    }
86
87    /// Logic to prepare for spawning
88    fn prepare_for_spawn(&self) -> Result<(), SpawnError> {
89        let mut state: pool::State = self.pool.state.load(Acquire).into();
90
91        // Increment the number of futures spawned on the pool as well as
92        // validate that the pool is still running/
93        loop {
94            let mut next = state;
95
96            if next.num_futures() == MAX_FUTURES {
97                // No capacity
98                return Err(SpawnError::at_capacity());
99            }
100
101            if next.lifecycle() == Lifecycle::ShutdownNow {
102                // Cannot execute the future, executor is shutdown.
103                return Err(SpawnError::shutdown());
104            }
105
106            next.inc_num_futures();
107
108            let actual = self
109                .pool
110                .state
111                .compare_and_swap(state.into(), next.into(), AcqRel)
112                .into();
113
114            if actual == state {
115                trace!("execute; count={:?}", next.num_futures());
116                break;
117            }
118
119            state = actual;
120        }
121
122        Ok(())
123    }
124}
125
126impl tokio_executor::Executor for Sender {
127    fn status(&self) -> Result<(), tokio_executor::SpawnError> {
128        let s = self;
129        tokio_executor::Executor::status(&s)
130    }
131
132    fn spawn(
133        &mut self,
134        future: Box<dyn Future<Item = (), Error = ()> + Send>,
135    ) -> Result<(), SpawnError> {
136        let mut s = &*self;
137        tokio_executor::Executor::spawn(&mut s, future)
138    }
139}
140
141impl<'a> tokio_executor::Executor for &'a Sender {
142    fn status(&self) -> Result<(), tokio_executor::SpawnError> {
143        let state: pool::State = self.pool.state.load(Acquire).into();
144
145        if state.num_futures() == MAX_FUTURES {
146            // No capacity
147            return Err(SpawnError::at_capacity());
148        }
149
150        if state.lifecycle() == Lifecycle::ShutdownNow {
151            // Cannot execute the future, executor is shutdown.
152            return Err(SpawnError::shutdown());
153        }
154
155        Ok(())
156    }
157
158    fn spawn(
159        &mut self,
160        future: Box<dyn Future<Item = (), Error = ()> + Send>,
161    ) -> Result<(), SpawnError> {
162        self.prepare_for_spawn()?;
163
164        // At this point, the pool has accepted the future, so schedule it for
165        // execution.
166
167        // Create a new task for the future
168        let task = Arc::new(Task::new(future));
169
170        // Call `submit_external()` in order to place the task into the global
171        // queue. This way all workers have equal chance of running this task,
172        // which means IO handles will be assigned to reactors more evenly.
173        self.pool.submit_external(task, &self.pool);
174
175        Ok(())
176    }
177}
178
179impl<T> tokio_executor::TypedExecutor<T> for Sender
180where
181    T: Future<Item = (), Error = ()> + Send + 'static,
182{
183    fn status(&self) -> Result<(), tokio_executor::SpawnError> {
184        tokio_executor::Executor::status(self)
185    }
186
187    fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
188        tokio_executor::Executor::spawn(self, Box::new(future))
189    }
190}
191
192impl<T> future::Executor<T> for Sender
193where
194    T: Future<Item = (), Error = ()> + Send + 'static,
195{
196    fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
197        if let Err(e) = tokio_executor::Executor::status(self) {
198            let kind = if e.is_at_capacity() {
199                future::ExecuteErrorKind::NoCapacity
200            } else {
201                future::ExecuteErrorKind::Shutdown
202            };
203
204            return Err(future::ExecuteError::new(kind, future));
205        }
206
207        let _ = self.spawn(future);
208        Ok(())
209    }
210}
211
212impl Clone for Sender {
213    #[inline]
214    fn clone(&self) -> Sender {
215        let pool = self.pool.clone();
216        Sender { pool }
217    }
218}