easy_parallel/
lib.rs

1//! Run closures in parallel.
2//!
3//! This is a simple primitive for spawning threads in bulk and waiting for them to complete.
4//! Threads are allowed to borrow local variables from the main thread.
5//!
6//! # Examples
7//!
8//! Run two threads that increment a number:
9//!
10//! ```
11//! use easy_parallel::Parallel;
12//! use std::sync::Mutex;
13//!
14//! let mut m = Mutex::new(0);
15//!
16//! Parallel::new()
17//!     .add(|| *m.lock().unwrap() += 1)
18//!     .add(|| *m.lock().unwrap() += 1)
19//!     .run();
20//!
21//! assert_eq!(*m.get_mut().unwrap(), 2);
22//! ```
23//!
24//! Square each number of a vector on a different thread:
25//!
26//! ```
27//! use easy_parallel::Parallel;
28//!
29//! let v = vec![10, 20, 30];
30//!
31//! let squares = Parallel::new()
32//!     .each(0..v.len(), |i| v[i] * v[i])
33//!     .run();
34//!
35//! assert_eq!(squares, [100, 400, 900]);
36//! ```
37//!
38//! Compute the sum of numbers in an array:
39//!
40//! ```
41//! use easy_parallel::Parallel;
42//!
43//! fn par_sum(v: &[i32]) -> i32 {
44//!     const THRESHOLD: usize = 2;
45//!
46//!     if v.len() <= THRESHOLD {
47//!         v.iter().copied().sum()
48//!     } else {
49//!         let half = (v.len() + 1) / 2;
50//!         let sums = Parallel::new().each(v.chunks(half), par_sum).run();
51//!         sums.into_iter().sum()
52//!     }
53//! }
54//!
55//! let v = [1, 25, -4, 10, 8];
56//! assert_eq!(par_sum(&v), 40);
57//! ```
58
59#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
60#![forbid(unsafe_code)]
61#![doc(
62    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
63)]
64#![doc(
65    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
66)]
67
68use std::fmt;
69use std::iter::{self, FromIterator};
70use std::panic;
71use std::sync::mpsc;
72use std::thread;
73
74/// A builder that runs closures in parallel.
75#[must_use]
76pub struct Parallel<'a, T> {
77    /// Closures to run.
78    closures: Vec<Box<dyn FnOnce() -> T + Send + 'a>>,
79}
80
81impl<'a, T> Parallel<'a, T> {
82    /// Creates a builder for running closures in parallel.
83    ///
84    /// # Examples
85    ///
86    /// ```
87    /// use easy_parallel::Parallel;
88    ///
89    /// let p = Parallel::<()>::new();
90    /// ```
91    pub fn new() -> Parallel<'a, T> {
92        Parallel {
93            closures: Vec::new(),
94        }
95    }
96
97    /// Adds a closure to the list.
98    ///
99    /// # Examples
100    ///
101    /// ```
102    /// use easy_parallel::Parallel;
103    ///
104    /// Parallel::new()
105    ///     .add(|| println!("hello from a thread"))
106    ///     .run();
107    /// ```
108    #[allow(clippy::should_implement_trait)]
109    pub fn add<F>(mut self, f: F) -> Parallel<'a, T>
110    where
111        F: FnOnce() -> T + Send + 'a,
112        T: Send + 'a,
113    {
114        self.closures.push(Box::new(f));
115        self
116    }
117
118    /// Adds a cloned closure for each item in an iterator.
119    ///
120    /// Each clone of the closure takes an item as an argument.
121    ///
122    /// # Examples
123    ///
124    /// ```
125    /// use easy_parallel::Parallel;
126    ///
127    /// Parallel::new()
128    ///     .each(0..5, |i| println!("hello from thread #{}", i))
129    ///     .run();
130    /// ```
131    pub fn each<A, I, F>(mut self, iter: I, f: F) -> Parallel<'a, T>
132    where
133        I: IntoIterator<Item = A>,
134        F: FnOnce(A) -> T + Clone + Send + 'a,
135        A: Send + 'a,
136        T: Send + 'a,
137    {
138        for t in iter.into_iter() {
139            let f = f.clone();
140            self.closures.push(Box::new(|| f(t)));
141        }
142        self
143    }
144
145    /// Runs each closure on a separate thread and collects their results.
146    ///
147    /// Results are collected in the order in which closures were added. One of the closures always
148    /// runs on the main thread because there is no point in spawning an extra thread for it.
149    ///
150    /// If a closure panics, panicking will resume in the main thread after all threads are joined.
151    ///
152    /// # Examples
153    ///
154    /// ```
155    /// use easy_parallel::Parallel;
156    /// use std::thread;
157    /// use std::time::Duration;
158    ///
159    /// let res = Parallel::new()
160    ///     .each(1..=3, |i| 10 * i)
161    ///     .add(|| 100)
162    ///     .collect::<Vec<_>>();
163    ///
164    /// assert_eq!(res, [10, 20, 30, 100]);
165    /// ```
166    pub fn collect<C>(mut self) -> C
167    where
168        T: Send + 'a,
169        C: FromIterator<T> + Extend<T>,
170    {
171        // Get the last closure.
172        let f = match self.closures.pop() {
173            None => return iter::empty().collect(),
174            Some(f) => f,
175        };
176
177        // Spawn threads, run the last closure on the current thread.
178        let (mut results, r) = self.finish_in::<_, _, C>(f);
179        results.extend(Some(r));
180        results
181    }
182
183    /// Runs each closure on a separate thread and collects their results.
184    ///
185    /// Results are collected in the order in which closures were added. One of the closures always
186    /// runs on the main thread because there is no point in spawning an extra thread for it.
187    ///
188    /// If a closure panics, panicking will resume in the main thread after all threads are joined.
189    ///
190    /// # Examples
191    ///
192    /// ```
193    /// use easy_parallel::Parallel;
194    /// use std::thread;
195    /// use std::time::Duration;
196    ///
197    /// let res = Parallel::new()
198    ///     .each(1..=3, |i| 10 * i)
199    ///     .add(|| 100)
200    ///     .run();
201    ///
202    /// assert_eq!(res, [10, 20, 30, 100]);
203    /// ```
204    pub fn run(self) -> Vec<T>
205    where
206        T: Send + 'a,
207    {
208        self.collect()
209    }
210
211    /// Finishes with a closure to run on the main thread, starts threads, and collects results.
212    ///
213    /// Results are collected in the order in which closures were added.
214    ///
215    /// If a closure panics, panicking will resume in the main thread after all threads are joined.
216    ///
217    /// # Examples
218    ///
219    /// ```
220    /// use easy_parallel::Parallel;
221    /// use std::thread;
222    /// use std::time::Duration;
223    ///
224    /// let (res, ()) = Parallel::new()
225    ///     .each(1..=3, |i| 10 * i)
226    ///     .finish(|| println!("Waiting for results"));
227    ///
228    /// assert_eq!(res, [10, 20, 30]);
229    /// ```
230    pub fn finish<F, R>(self, f: F) -> (Vec<T>, R)
231    where
232        F: FnOnce() -> R,
233        T: Send + 'a,
234    {
235        self.finish_in::<_, _, Vec<T>>(f)
236    }
237
238    /// Finishes with a closure to run on the main thread, starts threads, and collects results into an
239    /// arbitrary container.
240    ///
241    /// Results are collected in the order in which closures were added.
242    ///
243    /// If a closure panics, panicking will resume in the main thread after all threads are joined.
244    ///
245    /// # Examples
246    ///
247    /// ```
248    /// use easy_parallel::Parallel;
249    /// use std::thread;
250    /// use std::time::Duration;
251    ///
252    /// let (res, ()) = Parallel::new()
253    ///     .each(1..=3, |i| 10 * i)
254    ///     .finish_in::<_, _, Vec<i32>>(|| println!("Waiting for results"));
255    ///
256    /// assert_eq!(res, [10, 20, 30]);
257    /// ```
258    pub fn finish_in<F, R, C>(self, f: F) -> (C, R)
259    where
260        F: FnOnce() -> R,
261        T: Send + 'a,
262        C: FromIterator<T>,
263    {
264        // Set up a new thread scope.
265        thread::scope(|scope| {
266            // Join handles for spawned threads.
267            let mut handles = Vec::new();
268
269            // Channels to collect results from spawned threads.
270            let mut receivers = Vec::new();
271
272            for f in self.closures.into_iter() {
273                // Wrap into a closure that sends the result back.
274                let (sender, receiver) = mpsc::channel();
275                let f = move || sender.send(f()).unwrap();
276
277                // Spawn it on the scope.
278                handles.push(scope.spawn(f));
279                receivers.push(receiver);
280            }
281
282            let mut last_err = None;
283
284            // Run the main closure on the main thread.
285            let res = panic::catch_unwind(panic::AssertUnwindSafe(f));
286
287            // Join threads and save the last panic if there was one.
288            for h in handles {
289                if let Err(err) = h.join() {
290                    last_err = Some(err);
291                }
292            }
293
294            // If a thread has panicked, resume the last collected panic.
295            if let Some(err) = last_err {
296                panic::resume_unwind(err);
297            }
298
299            // Collect the results from threads.
300            let results = receivers.into_iter().map(|r| r.recv().unwrap()).collect();
301
302            // If the main closure panicked, resume its panic.
303            match res {
304                Ok(r) => (results, r),
305                Err(err) => panic::resume_unwind(err),
306            }
307        })
308    }
309}
310
311impl<T> fmt::Debug for Parallel<'_, T> {
312    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313        f.debug_struct("Parallel")
314            .field("len", &self.closures.len())
315            .finish()
316    }
317}
318
319impl<T> Default for Parallel<'_, T> {
320    fn default() -> Self {
321        Self::new()
322    }
323}