1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
use crossbeam_deque::{Steal, Stealer, Worker};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Mutex, TryLockError};
use std::thread::yield_now;
use crate::current_num_threads;
use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
use crate::iter::ParallelIterator;
/// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
///
/// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items
/// across the Rayon thread pool. This has the advantage of being able to parallelize just about
/// anything, but the resulting `ParallelIterator` can be less efficient than if you started with
/// `par_iter` instead. However, it can still be useful for iterators that are difficult to
/// parallelize by other means, like channels or file or network I/O.
///
/// The resulting iterator is not guaranteed to keep the order of the original iterator.
///
/// # Examples
///
/// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can
/// use any of the `ParallelIterator` methods:
///
/// ```
/// use rayon::iter::ParallelBridge;
/// use rayon::prelude::ParallelIterator;
/// use std::sync::mpsc::channel;
///
/// let rx = {
/// let (tx, rx) = channel();
///
/// tx.send("one!");
/// tx.send("two!");
/// tx.send("three!");
///
/// rx
/// };
///
/// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
/// output.sort_unstable();
///
/// assert_eq!(&*output, &["one!", "three!", "two!"]);
/// ```
pub trait ParallelBridge: Sized {
/// Create a bridge from this type to a `ParallelIterator`.
fn par_bridge(self) -> IterBridge<Self>;
}
impl<T: Iterator + Send> ParallelBridge for T
where
T::Item: Send,
{
fn par_bridge(self) -> IterBridge<Self> {
IterBridge { iter: self }
}
}
/// `IterBridge` is a parallel iterator that wraps a sequential iterator.
///
/// This type is created when using the `par_bridge` method on `ParallelBridge`. See the
/// [`ParallelBridge`] documentation for details.
///
/// [`ParallelBridge`]: trait.ParallelBridge.html
#[derive(Debug, Clone)]
pub struct IterBridge<Iter> {
iter: Iter,
}
impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
where
Iter::Item: Send,
{
type Item = Iter::Item;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
let split_count = AtomicUsize::new(current_num_threads());
let worker = Worker::new_fifo();
let stealer = worker.stealer();
let done = AtomicBool::new(false);
let iter = Mutex::new((self.iter, worker));
bridge_unindexed(
IterParallelProducer {
split_count: &split_count,
done: &done,
iter: &iter,
items: stealer,
},
consumer,
)
}
}
struct IterParallelProducer<'a, Iter: Iterator> {
split_count: &'a AtomicUsize,
done: &'a AtomicBool,
iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
items: Stealer<Iter::Item>,
}
// manual clone because T doesn't need to be Clone, but the derive assumes it should be
impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> {
fn clone(&self) -> Self {
IterParallelProducer {
split_count: self.split_count,
done: self.done,
iter: self.iter,
items: self.items.clone(),
}
}
}
impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter>
where
Iter::Item: Send,
{
type Item = Iter::Item;
fn split(self) -> (Self, Option<Self>) {
let mut count = self.split_count.load(Ordering::SeqCst);
loop {
let done = self.done.load(Ordering::SeqCst);
match count.checked_sub(1) {
Some(new_count) if !done => {
let last_count =
self.split_count
.compare_and_swap(count, new_count, Ordering::SeqCst);
if last_count == count {
return (self.clone(), Some(self));
} else {
count = last_count;
}
}
_ => {
return (self, None);
}
}
}
}
fn fold_with<F>(self, mut folder: F) -> F
where
F: Folder<Self::Item>,
{
loop {
match self.items.steal() {
Steal::Success(it) => {
folder = folder.consume(it);
if folder.full() {
return folder;
}
}
Steal::Empty => {
if self.done.load(Ordering::SeqCst) {
// the iterator is out of items, no use in continuing
return folder;
} else {
// our cache is out of items, time to load more from the iterator
match self.iter.try_lock() {
Ok(mut guard) => {
let count = current_num_threads();
let count = (count * count) * 2;
let (ref mut iter, ref worker) = *guard;
// while worker.len() < count {
// FIXME the new deque doesn't let us count items. We can just
// push a number of items, but that doesn't consider active
// stealers elsewhere.
for _ in 0..count {
if let Some(it) = iter.next() {
worker.push(it);
} else {
self.done.store(true, Ordering::SeqCst);
break;
}
}
}
Err(TryLockError::WouldBlock) => {
// someone else has the mutex, just sit tight until it's ready
yield_now(); //TODO: use a thread=pool-aware yield? (#548)
}
Err(TryLockError::Poisoned(_)) => {
// any panics from other threads will have been caught by the pool,
// and will be re-thrown when joined - just exit
return folder;
}
}
}
}
Steal::Retry => (),
}
}
}
}