rayon/iter/plumbing/
mod.rs

1//! Traits and functions used to implement parallel iteration.  These are
2//! low-level details -- users of parallel iterators should not need to
3//! interact with them directly.  See [the `plumbing` README][r] for a general overview.
4//!
5//! [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
6
7use crate::join_context;
8
9use super::IndexedParallelIterator;
10
11use std::cmp;
12use std::usize;
13
14/// The `ProducerCallback` trait is a kind of generic closure,
15/// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in
16/// the plumbing README][r] for more details.
17///
18/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback
19/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html
20pub trait ProducerCallback<T> {
21    /// The type of value returned by this callback. Analogous to
22    /// [`Output` from the `FnOnce` trait][Output].
23    ///
24    /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output
25    type Output;
26
27    /// Invokes the callback with the given producer as argument. The
28    /// key point of this trait is that this method is generic over
29    /// `P`, and hence implementors must be defined for any producer.
30    fn callback<P>(self, producer: P) -> Self::Output
31    where
32        P: Producer<Item = T>;
33}
34
35/// A `Producer` is effectively a "splittable `IntoIterator`". That
36/// is, a producer is a value which can be converted into an iterator
37/// at any time: at that point, it simply produces items on demand,
38/// like any iterator. But what makes a `Producer` special is that,
39/// *before* we convert to an iterator, we can also **split** it at a
40/// particular point using the `split_at` method. This will yield up
41/// two producers, one producing the items before that point, and one
42/// producing the items after that point (these two producers can then
43/// independently be split further, or be converted into iterators).
44/// In Rayon, this splitting is used to divide between threads.
45/// See [the `plumbing` README][r] for further details.
46///
47/// Note that each producer will always produce a fixed number of
48/// items N. However, this number N is not queryable through the API;
49/// the consumer is expected to track it.
50///
51/// NB. You might expect `Producer` to extend the `IntoIterator`
52/// trait.  However, [rust-lang/rust#20671][20671] prevents us from
53/// declaring the DoubleEndedIterator and ExactSizeIterator
54/// constraints on a required IntoIterator trait, so we inline
55/// IntoIterator here until that issue is fixed.
56///
57/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
58/// [20671]: https://github.com/rust-lang/rust/issues/20671
59pub trait Producer: Send + Sized {
60    /// The type of item that will be produced by this producer once
61    /// it is converted into an iterator.
62    type Item;
63
64    /// The type of iterator we will become.
65    type IntoIter: Iterator<Item = Self::Item> + DoubleEndedIterator + ExactSizeIterator;
66
67    /// Convert `self` into an iterator; at this point, no more parallel splits
68    /// are possible.
69    fn into_iter(self) -> Self::IntoIter;
70
71    /// The minimum number of items that we will process
72    /// sequentially. Defaults to 1, which means that we will split
73    /// all the way down to a single item. This can be raised higher
74    /// using the [`with_min_len`] method, which will force us to
75    /// create sequential tasks at a larger granularity. Note that
76    /// Rayon automatically normally attempts to adjust the size of
77    /// parallel splits to reduce overhead, so this should not be
78    /// needed.
79    ///
80    /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len
81    fn min_len(&self) -> usize {
82        1
83    }
84
85    /// The maximum number of items that we will process
86    /// sequentially. Defaults to MAX, which means that we can choose
87    /// not to split at all. This can be lowered using the
88    /// [`with_max_len`] method, which will force us to create more
89    /// parallel tasks. Note that Rayon automatically normally
90    /// attempts to adjust the size of parallel splits to reduce
91    /// overhead, so this should not be needed.
92    ///
93    /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len
94    fn max_len(&self) -> usize {
95        usize::MAX
96    }
97
98    /// Split into two producers; one produces items `0..index`, the
99    /// other `index..N`. Index must be less than or equal to `N`.
100    fn split_at(self, index: usize) -> (Self, Self);
101
102    /// Iterate the producer, feeding each element to `folder`, and
103    /// stop when the folder is full (or all elements have been consumed).
104    ///
105    /// The provided implementation is sufficient for most iterables.
106    fn fold_with<F>(self, folder: F) -> F
107    where
108        F: Folder<Self::Item>,
109    {
110        folder.consume_iter(self.into_iter())
111    }
112}
113
114/// A consumer is effectively a [generalized "fold" operation][fold],
115/// and in fact each consumer will eventually be converted into a
116/// [`Folder`]. What makes a consumer special is that, like a
117/// [`Producer`], it can be **split** into multiple consumers using
118/// the `split_at` method. When a consumer is split, it produces two
119/// consumers, as well as a **reducer**. The two consumers can be fed
120/// items independently, and when they are done the reducer is used to
121/// combine their two results into one. See [the `plumbing`
122/// README][r] for further details.
123///
124/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
125/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold
126/// [`Folder`]: trait.Folder.html
127/// [`Producer`]: trait.Producer.html
128pub trait Consumer<Item>: Send + Sized {
129    /// The type of folder that this consumer can be converted into.
130    type Folder: Folder<Item, Result = Self::Result>;
131
132    /// The type of reducer that is produced if this consumer is split.
133    type Reducer: Reducer<Self::Result>;
134
135    /// The type of result that this consumer will ultimately produce.
136    type Result: Send;
137
138    /// Divide the consumer into two consumers, one processing items
139    /// `0..index` and one processing items from `index..`. Also
140    /// produces a reducer that can be used to reduce the results at
141    /// the end.
142    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer);
143
144    /// Convert the consumer into a folder that can consume items
145    /// sequentially, eventually producing a final result.
146    fn into_folder(self) -> Self::Folder;
147
148    /// Hint whether this `Consumer` would like to stop processing
149    /// further items, e.g. if a search has been completed.
150    fn full(&self) -> bool;
151}
152
153/// The `Folder` trait encapsulates [the standard fold
154/// operation][fold].  It can be fed many items using the `consume`
155/// method. At the end, once all items have been consumed, it can then
156/// be converted (using `complete`) into a final value.
157///
158/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold
159pub trait Folder<Item>: Sized {
160    /// The type of result that will ultimately be produced by the folder.
161    type Result;
162
163    /// Consume next item and return new sequential state.
164    fn consume(self, item: Item) -> Self;
165
166    /// Consume items from the iterator until full, and return new sequential state.
167    ///
168    /// This method is **optional**. The default simply iterates over
169    /// `iter`, invoking `consume` and checking after each iteration
170    /// whether `full` returns false.
171    ///
172    /// The main reason to override it is if you can provide a more
173    /// specialized, efficient implementation.
174    fn consume_iter<I>(mut self, iter: I) -> Self
175    where
176        I: IntoIterator<Item = Item>,
177    {
178        for item in iter {
179            self = self.consume(item);
180            if self.full() {
181                break;
182            }
183        }
184        self
185    }
186
187    /// Finish consuming items, produce final result.
188    fn complete(self) -> Self::Result;
189
190    /// Hint whether this `Folder` would like to stop processing
191    /// further items, e.g. if a search has been completed.
192    fn full(&self) -> bool;
193}
194
195/// The reducer is the final step of a `Consumer` -- after a consumer
196/// has been split into two parts, and each of those parts has been
197/// fully processed, we are left with two results. The reducer is then
198/// used to combine those two results into one. See [the `plumbing`
199/// README][r] for further details.
200///
201/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
202pub trait Reducer<Result> {
203    /// Reduce two final results into one; this is executed after a
204    /// split.
205    fn reduce(self, left: Result, right: Result) -> Result;
206}
207
208/// A stateless consumer can be freely copied. These consumers can be
209/// used like regular consumers, but they also support a
210/// `split_off_left` method that does not take an index to split, but
211/// simply splits at some arbitrary point (`for_each`, for example,
212/// produces an unindexed consumer).
213pub trait UnindexedConsumer<I>: Consumer<I> {
214    /// Splits off a "left" consumer and returns it. The `self`
215    /// consumer should then be used to consume the "right" portion of
216    /// the data. (The ordering matters for methods like find_first --
217    /// values produced by the returned value are given precedence
218    /// over values produced by `self`.) Once the left and right
219    /// halves have been fully consumed, you should reduce the results
220    /// with the result of `to_reducer`.
221    fn split_off_left(&self) -> Self;
222
223    /// Creates a reducer that can be used to combine the results from
224    /// a split consumer.
225    fn to_reducer(&self) -> Self::Reducer;
226}
227
228/// A variant on `Producer` which does not know its exact length or
229/// cannot represent it in a `usize`. These producers act like
230/// ordinary producers except that they cannot be told to split at a
231/// particular point. Instead, you just ask them to split 'somewhere'.
232///
233/// (In principle, `Producer` could extend this trait; however, it
234/// does not because to do so would require producers to carry their
235/// own length with them.)
236pub trait UnindexedProducer: Send + Sized {
237    /// The type of item returned by this producer.
238    type Item;
239
240    /// Split midway into a new producer if possible, otherwise return `None`.
241    fn split(self) -> (Self, Option<Self>);
242
243    /// Iterate the producer, feeding each element to `folder`, and
244    /// stop when the folder is full (or all elements have been consumed).
245    fn fold_with<F>(self, folder: F) -> F
246    where
247        F: Folder<Self::Item>;
248}
249
250/// A splitter controls the policy for splitting into smaller work items.
251///
252/// Thief-splitting is an adaptive policy that starts by splitting into
253/// enough jobs for every worker thread, and then resets itself whenever a
254/// job is actually stolen into a different thread.
255#[derive(Clone, Copy)]
256struct Splitter {
257    /// The `splits` tell us approximately how many remaining times we'd
258    /// like to split this job.  We always just divide it by two though, so
259    /// the effective number of pieces will be `next_power_of_two()`.
260    splits: usize,
261}
262
263impl Splitter {
264    #[inline]
265    fn new() -> Splitter {
266        Splitter {
267            splits: crate::current_num_threads(),
268        }
269    }
270
271    #[inline]
272    fn try_split(&mut self, stolen: bool) -> bool {
273        let Splitter { splits } = *self;
274
275        if stolen {
276            // This job was stolen!  Reset the number of desired splits to the
277            // thread count, if that's more than we had remaining anyway.
278            self.splits = cmp::max(crate::current_num_threads(), self.splits / 2);
279            true
280        } else if splits > 0 {
281            // We have splits remaining, make it so.
282            self.splits /= 2;
283            true
284        } else {
285            // Not stolen, and no more splits -- we're done!
286            false
287        }
288    }
289}
290
291/// The length splitter is built on thief-splitting, but additionally takes
292/// into account the remaining length of the iterator.
293#[derive(Clone, Copy)]
294struct LengthSplitter {
295    inner: Splitter,
296
297    /// The smallest we're willing to divide into.  Usually this is just 1,
298    /// but you can choose a larger working size with `with_min_len()`.
299    min: usize,
300}
301
302impl LengthSplitter {
303    /// Creates a new splitter based on lengths.
304    ///
305    /// The `min` is a hard lower bound.  We'll never split below that, but
306    /// of course an iterator might start out smaller already.
307    ///
308    /// The `max` is an upper bound on the working size, used to determine
309    /// the minimum number of times we need to split to get under that limit.
310    /// The adaptive algorithm may very well split even further, but never
311    /// smaller than the `min`.
312    #[inline]
313    fn new(min: usize, max: usize, len: usize) -> LengthSplitter {
314        let mut splitter = LengthSplitter {
315            inner: Splitter::new(),
316            min: cmp::max(min, 1),
317        };
318
319        // Divide the given length by the max working length to get the minimum
320        // number of splits we need to get under that max.  This rounds down,
321        // but the splitter actually gives `next_power_of_two()` pieces anyway.
322        // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces.
323        let min_splits = len / cmp::max(max, 1);
324
325        // Only update the value if it's not splitting enough already.
326        if min_splits > splitter.inner.splits {
327            splitter.inner.splits = min_splits;
328        }
329
330        splitter
331    }
332
333    #[inline]
334    fn try_split(&mut self, len: usize, stolen: bool) -> bool {
335        // If splitting wouldn't make us too small, try the inner splitter.
336        len / 2 >= self.min && self.inner.try_split(stolen)
337    }
338}
339
340/// This helper function is used to "connect" a parallel iterator to a
341/// consumer. It will convert the `par_iter` into a producer P and
342/// then pull items from P and feed them to `consumer`, splitting and
343/// creating parallel threads as needed.
344///
345/// This is useful when you are implementing your own parallel
346/// iterators: it is often used as the definition of the
347/// [`drive_unindexed`] or [`drive`] methods.
348///
349/// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed
350/// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive
351pub fn bridge<I, C>(par_iter: I, consumer: C) -> C::Result
352where
353    I: IndexedParallelIterator,
354    C: Consumer<I::Item>,
355{
356    let len = par_iter.len();
357    return par_iter.with_producer(Callback { len, consumer });
358
359    struct Callback<C> {
360        len: usize,
361        consumer: C,
362    }
363
364    impl<C, I> ProducerCallback<I> for Callback<C>
365    where
366        C: Consumer<I>,
367    {
368        type Output = C::Result;
369        fn callback<P>(self, producer: P) -> C::Result
370        where
371            P: Producer<Item = I>,
372        {
373            bridge_producer_consumer(self.len, producer, self.consumer)
374        }
375    }
376}
377
378/// This helper function is used to "connect" a producer and a
379/// consumer. You may prefer to call [`bridge`], which wraps this
380/// function. This function will draw items from `producer` and feed
381/// them to `consumer`, splitting and creating parallel tasks when
382/// needed.
383///
384/// This is useful when you are implementing your own parallel
385/// iterators: it is often used as the definition of the
386/// [`drive_unindexed`] or [`drive`] methods.
387///
388/// [`bridge`]: fn.bridge.html
389/// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed
390/// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive
391pub fn bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result
392where
393    P: Producer,
394    C: Consumer<P::Item>,
395{
396    let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len);
397    return helper(len, false, splitter, producer, consumer);
398
399    fn helper<P, C>(
400        len: usize,
401        migrated: bool,
402        mut splitter: LengthSplitter,
403        producer: P,
404        consumer: C,
405    ) -> C::Result
406    where
407        P: Producer,
408        C: Consumer<P::Item>,
409    {
410        if consumer.full() {
411            consumer.into_folder().complete()
412        } else if splitter.try_split(len, migrated) {
413            let mid = len / 2;
414            let (left_producer, right_producer) = producer.split_at(mid);
415            let (left_consumer, right_consumer, reducer) = consumer.split_at(mid);
416            let (left_result, right_result) = join_context(
417                |context| {
418                    helper(
419                        mid,
420                        context.migrated(),
421                        splitter,
422                        left_producer,
423                        left_consumer,
424                    )
425                },
426                |context| {
427                    helper(
428                        len - mid,
429                        context.migrated(),
430                        splitter,
431                        right_producer,
432                        right_consumer,
433                    )
434                },
435            );
436            reducer.reduce(left_result, right_result)
437        } else {
438            producer.fold_with(consumer.into_folder()).complete()
439        }
440    }
441}
442
443/// A variant of [`bridge_producer_consumer`] where the producer is an unindexed producer.
444///
445/// [`bridge_producer_consumer`]: fn.bridge_producer_consumer.html
446pub fn bridge_unindexed<P, C>(producer: P, consumer: C) -> C::Result
447where
448    P: UnindexedProducer,
449    C: UnindexedConsumer<P::Item>,
450{
451    let splitter = Splitter::new();
452    bridge_unindexed_producer_consumer(false, splitter, producer, consumer)
453}
454
455fn bridge_unindexed_producer_consumer<P, C>(
456    migrated: bool,
457    mut splitter: Splitter,
458    producer: P,
459    consumer: C,
460) -> C::Result
461where
462    P: UnindexedProducer,
463    C: UnindexedConsumer<P::Item>,
464{
465    if consumer.full() {
466        consumer.into_folder().complete()
467    } else if splitter.try_split(migrated) {
468        match producer.split() {
469            (left_producer, Some(right_producer)) => {
470                let (reducer, left_consumer, right_consumer) =
471                    (consumer.to_reducer(), consumer.split_off_left(), consumer);
472                let bridge = bridge_unindexed_producer_consumer;
473                let (left_result, right_result) = join_context(
474                    |context| bridge(context.migrated(), splitter, left_producer, left_consumer),
475                    |context| bridge(context.migrated(), splitter, right_producer, right_consumer),
476                );
477                reducer.reduce(left_result, right_result)
478            }
479            (producer, None) => producer.fold_with(consumer.into_folder()).complete(),
480        }
481    } else {
482        producer.fold_with(consumer.into_folder()).complete()
483    }
484}