snarkvm_utilities/
parallel.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkVM library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{boxed::Box, vec::Vec};
17
18pub struct ExecutionPool<'a, T> {
19    jobs: Vec<Box<dyn 'a + FnOnce() -> T + Send>>,
20}
21
22impl<'a, T> ExecutionPool<'a, T> {
23    pub fn new() -> Self {
24        Self { jobs: Vec::new() }
25    }
26
27    pub fn with_capacity(cap: usize) -> Self {
28        Self { jobs: Vec::with_capacity(cap) }
29    }
30
31    pub fn add_job<F: 'a + FnOnce() -> T + Send>(&mut self, f: F) {
32        self.jobs.push(Box::new(f));
33    }
34
35    pub fn execute_all(self) -> Vec<T>
36    where
37        T: Send + Sync,
38    {
39        #[cfg(not(feature = "serial"))]
40        {
41            use rayon::prelude::*;
42            execute_with_max_available_threads(|| self.jobs.into_par_iter().map(|f| f()).collect())
43        }
44        #[cfg(feature = "serial")]
45        {
46            self.jobs.into_iter().map(|f| f()).collect()
47        }
48    }
49}
50
51impl<T> Default for ExecutionPool<'_, T> {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57#[cfg(not(feature = "serial"))]
58pub fn max_available_threads() -> usize {
59    use aleo_std::Cpu;
60    let rayon_threads = rayon::current_num_threads();
61
62    match aleo_std::get_cpu() {
63        Cpu::Intel => num_cpus::get_physical().min(rayon_threads),
64        Cpu::AMD | Cpu::Unknown => rayon_threads,
65    }
66}
67
68#[inline(always)]
69#[cfg(not(any(feature = "serial", feature = "wasm")))]
70pub fn execute_with_max_available_threads<T: Sync + Send>(f: impl FnOnce() -> T + Send) -> T {
71    execute_with_threads(f, max_available_threads())
72}
73
74#[inline(always)]
75#[cfg(any(feature = "serial", feature = "wasm"))]
76pub fn execute_with_max_available_threads<T>(f: impl FnOnce() -> T + Send) -> T {
77    f()
78}
79
80#[cfg(not(any(feature = "serial", feature = "wasm")))]
81#[inline(always)]
82fn execute_with_threads<T: Sync + Send>(f: impl FnOnce() -> T + Send, num_threads: usize) -> T {
83    let pool = rayon::ThreadPoolBuilder::new().num_threads(num_threads).build().unwrap();
84    pool.install(f)
85}
86
87/// Creates parallel iterator over refs if `parallel` feature is enabled.
88#[macro_export]
89macro_rules! cfg_iter {
90    ($e: expr) => {{
91        #[cfg(not(feature = "serial"))]
92        let result = $e.par_iter();
93
94        #[cfg(feature = "serial")]
95        let result = $e.iter();
96
97        result
98    }};
99}
100
101/// Creates parallel iterator over mut refs if `parallel` feature is enabled.
102#[macro_export]
103macro_rules! cfg_iter_mut {
104    ($e: expr) => {{
105        #[cfg(not(feature = "serial"))]
106        let result = $e.par_iter_mut();
107
108        #[cfg(feature = "serial")]
109        let result = $e.iter_mut();
110
111        result
112    }};
113}
114
115/// Creates parallel iterator if `parallel` feature is enabled.
116#[macro_export]
117macro_rules! cfg_into_iter {
118    ($e: expr) => {{
119        #[cfg(not(feature = "serial"))]
120        let result = $e.into_par_iter();
121
122        #[cfg(feature = "serial")]
123        let result = $e.into_iter();
124
125        result
126    }};
127}
128
129/// Returns an iterator over `chunk_size` elements of the slice at a
130/// time.
131#[macro_export]
132macro_rules! cfg_chunks {
133    ($e: expr, $size: expr) => {{
134        #[cfg(not(feature = "serial"))]
135        let result = $e.par_chunks($size);
136
137        #[cfg(feature = "serial")]
138        let result = $e.chunks($size);
139
140        result
141    }};
142}
143
144/// Returns an iterator over `chunk_size` elements of the slice at a time.
145#[macro_export]
146macro_rules! cfg_chunks_mut {
147    ($e: expr, $size: expr) => {{
148        #[cfg(not(feature = "serial"))]
149        let result = $e.par_chunks_mut($size);
150
151        #[cfg(feature = "serial")]
152        let result = $e.chunks_mut($size);
153
154        result
155    }};
156}
157
158/// Creates parallel iterator from iterator if `parallel` feature is enabled.
159#[macro_export]
160macro_rules! cfg_par_bridge {
161    ($e: expr) => {{
162        #[cfg(not(feature = "serial"))]
163        let result = $e.par_bridge();
164
165        #[cfg(feature = "serial")]
166        let result = $e;
167
168        result
169    }};
170}
171
172/// Applies the reduce operation over an iterator.
173#[macro_export]
174macro_rules! cfg_reduce {
175    ($e: expr, $default: expr, $op: expr) => {{
176        #[cfg(not(feature = "serial"))]
177        let result = $e.reduce($default, $op);
178
179        #[cfg(feature = "serial")]
180        let result = $e.fold($default(), $op);
181
182        result
183    }};
184}
185
186/// Applies `reduce_with` or `reduce` depending on the `serial` feature.
187#[macro_export]
188macro_rules! cfg_reduce_with {
189    ($e: expr, $op: expr) => {{
190        #[cfg(not(feature = "serial"))]
191        let result = $e.reduce_with($op);
192
193        #[cfg(feature = "serial")]
194        let result = $e.reduce($op);
195
196        result
197    }};
198}
199
200/// Turns a collection into an iterator.
201#[macro_export]
202macro_rules! cfg_keys {
203    ($e: expr) => {{
204        #[cfg(not(feature = "serial"))]
205        let result = $e.par_keys();
206
207        #[cfg(feature = "serial")]
208        let result = $e.keys();
209
210        result
211    }};
212}
213
214/// Turns a collection into an iterator.
215#[macro_export]
216macro_rules! cfg_values {
217    ($e: expr) => {{
218        #[cfg(not(feature = "serial"))]
219        let result = $e.par_values();
220
221        #[cfg(feature = "serial")]
222        let result = $e.values();
223
224        result
225    }};
226}
227
228/// Finds the first element that satisfies the predicate function
229#[macro_export]
230macro_rules! cfg_find {
231    ($self:expr, $object:expr, $func:ident) => {{
232        #[cfg(not(feature = "serial"))]
233        let result = $self.par_values().find_any(|tx| tx.$func($object));
234
235        #[cfg(feature = "serial")]
236        let result = $self.values().find(|tx| tx.$func($object));
237
238        result
239    }};
240}
241
242/// Applies a function and returns the first value that is not None
243#[macro_export]
244macro_rules! cfg_find_map {
245    ($self:expr, $object:expr, $func:ident) => {{
246        #[cfg(not(feature = "serial"))]
247        let result = $self.par_values().filter_map(|tx| tx.$func($object)).find_any(|_| true);
248
249        #[cfg(feature = "serial")]
250        let result = $self.values().find_map(|tx| tx.$func($object));
251
252        result
253    }};
254}
255
256/// Applies fold to the iterator
257#[macro_export]
258macro_rules! cfg_zip_fold {
259    ($self: expr, $other: expr, $init: expr, $op: expr, $type: ty) => {{
260        let default = $init;
261
262        #[cfg(feature = "serial")]
263        let default = $init();
264        let result = $self.zip_eq($other).fold(default, $op);
265
266        #[cfg(not(feature = "serial"))]
267        let result = result.sum::<$type>();
268
269        result
270    }};
271}
272
273/// Performs an unstable sort
274#[macro_export]
275macro_rules! cfg_sort_unstable_by {
276    ($self: expr, $closure: expr) => {{
277        #[cfg(feature = "serial")]
278        $self.sort_unstable_by($closure);
279
280        #[cfg(not(feature = "serial"))]
281        $self.par_sort_unstable_by($closure);
282    }};
283}
284
285/// Performs a sort that caches the extracted keys
286#[macro_export]
287macro_rules! cfg_sort_by_cached_key {
288    ($self: expr, $closure: expr) => {{
289        #[cfg(feature = "serial")]
290        $self.sort_by_cached_key($closure);
291
292        #[cfg(not(feature = "serial"))]
293        $self.par_sort_by_cached_key($closure);
294    }};
295}
296
297/// Returns a sorted, by-value iterator for the given IndexMap/IndexSet
298#[macro_export]
299macro_rules! cfg_sorted_by {
300    ($self: expr, $closure: expr) => {{
301        #[cfg(feature = "serial")]
302        {
303            $self.sorted_by($closure)
304        }
305
306        #[cfg(not(feature = "serial"))]
307        {
308            $self.par_sorted_by($closure)
309        }
310    }};
311}