easy_parallel/lib.rs
1//! Run closures in parallel.
2//!
3//! This is a simple primitive for spawning threads in bulk and waiting for them to complete.
4//! Threads are allowed to borrow local variables from the main thread.
5//!
6//! # Examples
7//!
8//! Run two threads that increment a number:
9//!
10//! ```
11//! use easy_parallel::Parallel;
12//! use std::sync::Mutex;
13//!
14//! let mut m = Mutex::new(0);
15//!
16//! Parallel::new()
17//! .add(|| *m.lock().unwrap() += 1)
18//! .add(|| *m.lock().unwrap() += 1)
19//! .run();
20//!
21//! assert_eq!(*m.get_mut().unwrap(), 2);
22//! ```
23//!
24//! Square each number of a vector on a different thread:
25//!
26//! ```
27//! use easy_parallel::Parallel;
28//!
29//! let v = vec![10, 20, 30];
30//!
31//! let squares = Parallel::new()
32//! .each(0..v.len(), |i| v[i] * v[i])
33//! .run();
34//!
35//! assert_eq!(squares, [100, 400, 900]);
36//! ```
37//!
38//! Compute the sum of numbers in an array:
39//!
40//! ```
41//! use easy_parallel::Parallel;
42//!
43//! fn par_sum(v: &[i32]) -> i32 {
44//! const THRESHOLD: usize = 2;
45//!
46//! if v.len() <= THRESHOLD {
47//! v.iter().copied().sum()
48//! } else {
49//! let half = (v.len() + 1) / 2;
50//! let sums = Parallel::new().each(v.chunks(half), par_sum).run();
51//! sums.into_iter().sum()
52//! }
53//! }
54//!
55//! let v = [1, 25, -4, 10, 8];
56//! assert_eq!(par_sum(&v), 40);
57//! ```
58
59#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
60#![forbid(unsafe_code)]
61#![doc(
62 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
63)]
64#![doc(
65 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
66)]
67
68use std::fmt;
69use std::iter::{self, FromIterator};
70use std::panic;
71use std::sync::mpsc;
72use std::thread;
73
74/// A builder that runs closures in parallel.
75#[must_use]
76pub struct Parallel<'a, T> {
77 /// Closures to run.
78 closures: Vec<Box<dyn FnOnce() -> T + Send + 'a>>,
79}
80
81impl<'a, T> Parallel<'a, T> {
82 /// Creates a builder for running closures in parallel.
83 ///
84 /// # Examples
85 ///
86 /// ```
87 /// use easy_parallel::Parallel;
88 ///
89 /// let p = Parallel::<()>::new();
90 /// ```
91 pub fn new() -> Parallel<'a, T> {
92 Parallel {
93 closures: Vec::new(),
94 }
95 }
96
97 /// Adds a closure to the list.
98 ///
99 /// # Examples
100 ///
101 /// ```
102 /// use easy_parallel::Parallel;
103 ///
104 /// Parallel::new()
105 /// .add(|| println!("hello from a thread"))
106 /// .run();
107 /// ```
108 #[allow(clippy::should_implement_trait)]
109 pub fn add<F>(mut self, f: F) -> Parallel<'a, T>
110 where
111 F: FnOnce() -> T + Send + 'a,
112 T: Send + 'a,
113 {
114 self.closures.push(Box::new(f));
115 self
116 }
117
118 /// Adds a cloned closure for each item in an iterator.
119 ///
120 /// Each clone of the closure takes an item as an argument.
121 ///
122 /// # Examples
123 ///
124 /// ```
125 /// use easy_parallel::Parallel;
126 ///
127 /// Parallel::new()
128 /// .each(0..5, |i| println!("hello from thread #{}", i))
129 /// .run();
130 /// ```
131 pub fn each<A, I, F>(mut self, iter: I, f: F) -> Parallel<'a, T>
132 where
133 I: IntoIterator<Item = A>,
134 F: FnOnce(A) -> T + Clone + Send + 'a,
135 A: Send + 'a,
136 T: Send + 'a,
137 {
138 for t in iter.into_iter() {
139 let f = f.clone();
140 self.closures.push(Box::new(|| f(t)));
141 }
142 self
143 }
144
145 /// Runs each closure on a separate thread and collects their results.
146 ///
147 /// Results are collected in the order in which closures were added. One of the closures always
148 /// runs on the main thread because there is no point in spawning an extra thread for it.
149 ///
150 /// If a closure panics, panicking will resume in the main thread after all threads are joined.
151 ///
152 /// # Examples
153 ///
154 /// ```
155 /// use easy_parallel::Parallel;
156 /// use std::thread;
157 /// use std::time::Duration;
158 ///
159 /// let res = Parallel::new()
160 /// .each(1..=3, |i| 10 * i)
161 /// .add(|| 100)
162 /// .collect::<Vec<_>>();
163 ///
164 /// assert_eq!(res, [10, 20, 30, 100]);
165 /// ```
166 pub fn collect<C>(mut self) -> C
167 where
168 T: Send + 'a,
169 C: FromIterator<T> + Extend<T>,
170 {
171 // Get the last closure.
172 let f = match self.closures.pop() {
173 None => return iter::empty().collect(),
174 Some(f) => f,
175 };
176
177 // Spawn threads, run the last closure on the current thread.
178 let (mut results, r) = self.finish_in::<_, _, C>(f);
179 results.extend(Some(r));
180 results
181 }
182
183 /// Runs each closure on a separate thread and collects their results.
184 ///
185 /// Results are collected in the order in which closures were added. One of the closures always
186 /// runs on the main thread because there is no point in spawning an extra thread for it.
187 ///
188 /// If a closure panics, panicking will resume in the main thread after all threads are joined.
189 ///
190 /// # Examples
191 ///
192 /// ```
193 /// use easy_parallel::Parallel;
194 /// use std::thread;
195 /// use std::time::Duration;
196 ///
197 /// let res = Parallel::new()
198 /// .each(1..=3, |i| 10 * i)
199 /// .add(|| 100)
200 /// .run();
201 ///
202 /// assert_eq!(res, [10, 20, 30, 100]);
203 /// ```
204 pub fn run(self) -> Vec<T>
205 where
206 T: Send + 'a,
207 {
208 self.collect()
209 }
210
211 /// Finishes with a closure to run on the main thread, starts threads, and collects results.
212 ///
213 /// Results are collected in the order in which closures were added.
214 ///
215 /// If a closure panics, panicking will resume in the main thread after all threads are joined.
216 ///
217 /// # Examples
218 ///
219 /// ```
220 /// use easy_parallel::Parallel;
221 /// use std::thread;
222 /// use std::time::Duration;
223 ///
224 /// let (res, ()) = Parallel::new()
225 /// .each(1..=3, |i| 10 * i)
226 /// .finish(|| println!("Waiting for results"));
227 ///
228 /// assert_eq!(res, [10, 20, 30]);
229 /// ```
230 pub fn finish<F, R>(self, f: F) -> (Vec<T>, R)
231 where
232 F: FnOnce() -> R,
233 T: Send + 'a,
234 {
235 self.finish_in::<_, _, Vec<T>>(f)
236 }
237
238 /// Finishes with a closure to run on the main thread, starts threads, and collects results into an
239 /// arbitrary container.
240 ///
241 /// Results are collected in the order in which closures were added.
242 ///
243 /// If a closure panics, panicking will resume in the main thread after all threads are joined.
244 ///
245 /// # Examples
246 ///
247 /// ```
248 /// use easy_parallel::Parallel;
249 /// use std::thread;
250 /// use std::time::Duration;
251 ///
252 /// let (res, ()) = Parallel::new()
253 /// .each(1..=3, |i| 10 * i)
254 /// .finish_in::<_, _, Vec<i32>>(|| println!("Waiting for results"));
255 ///
256 /// assert_eq!(res, [10, 20, 30]);
257 /// ```
258 pub fn finish_in<F, R, C>(self, f: F) -> (C, R)
259 where
260 F: FnOnce() -> R,
261 T: Send + 'a,
262 C: FromIterator<T>,
263 {
264 // Set up a new thread scope.
265 thread::scope(|scope| {
266 // Join handles for spawned threads.
267 let mut handles = Vec::new();
268
269 // Channels to collect results from spawned threads.
270 let mut receivers = Vec::new();
271
272 for f in self.closures.into_iter() {
273 // Wrap into a closure that sends the result back.
274 let (sender, receiver) = mpsc::channel();
275 let f = move || sender.send(f()).unwrap();
276
277 // Spawn it on the scope.
278 handles.push(scope.spawn(f));
279 receivers.push(receiver);
280 }
281
282 let mut last_err = None;
283
284 // Run the main closure on the main thread.
285 let res = panic::catch_unwind(panic::AssertUnwindSafe(f));
286
287 // Join threads and save the last panic if there was one.
288 for h in handles {
289 if let Err(err) = h.join() {
290 last_err = Some(err);
291 }
292 }
293
294 // If a thread has panicked, resume the last collected panic.
295 if let Some(err) = last_err {
296 panic::resume_unwind(err);
297 }
298
299 // Collect the results from threads.
300 let results = receivers.into_iter().map(|r| r.recv().unwrap()).collect();
301
302 // If the main closure panicked, resume its panic.
303 match res {
304 Ok(r) => (results, r),
305 Err(err) => panic::resume_unwind(err),
306 }
307 })
308 }
309}
310
311impl<T> fmt::Debug for Parallel<'_, T> {
312 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313 f.debug_struct("Parallel")
314 .field("len", &self.closures.len())
315 .finish()
316 }
317}
318
319impl<T> Default for Parallel<'_, T> {
320 fn default() -> Self {
321 Self::new()
322 }
323}