rayon/iter/
chunks.rs

1use std::cmp::min;
2
3use super::plumbing::*;
4use super::*;
5use crate::math::div_round_up;
6
7/// `Chunks` is an iterator that groups elements of an underlying iterator.
8///
9/// This struct is created by the [`chunks()`] method on [`IndexedParallelIterator`]
10///
11/// [`chunks()`]: trait.IndexedParallelIterator.html#method.chunks
12/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
13#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14#[derive(Debug, Clone)]
15pub struct Chunks<I>
16where
17    I: IndexedParallelIterator,
18{
19    size: usize,
20    i: I,
21}
22
23impl<I> Chunks<I>
24where
25    I: IndexedParallelIterator,
26{
27    /// Creates a new `Chunks` iterator
28    pub(super) fn new(i: I, size: usize) -> Self {
29        Chunks { i, size }
30    }
31}
32
33impl<I> ParallelIterator for Chunks<I>
34where
35    I: IndexedParallelIterator,
36{
37    type Item = Vec<I::Item>;
38
39    fn drive_unindexed<C>(self, consumer: C) -> C::Result
40    where
41        C: Consumer<Vec<I::Item>>,
42    {
43        bridge(self, consumer)
44    }
45
46    fn opt_len(&self) -> Option<usize> {
47        Some(self.len())
48    }
49}
50
51impl<I> IndexedParallelIterator for Chunks<I>
52where
53    I: IndexedParallelIterator,
54{
55    fn drive<C>(self, consumer: C) -> C::Result
56    where
57        C: Consumer<Self::Item>,
58    {
59        bridge(self, consumer)
60    }
61
62    fn len(&self) -> usize {
63        div_round_up(self.i.len(), self.size)
64    }
65
66    fn with_producer<CB>(self, callback: CB) -> CB::Output
67    where
68        CB: ProducerCallback<Self::Item>,
69    {
70        let len = self.i.len();
71        return self.i.with_producer(Callback {
72            size: self.size,
73            len,
74            callback,
75        });
76
77        struct Callback<CB> {
78            size: usize,
79            len: usize,
80            callback: CB,
81        }
82
83        impl<T, CB> ProducerCallback<T> for Callback<CB>
84        where
85            CB: ProducerCallback<Vec<T>>,
86        {
87            type Output = CB::Output;
88
89            fn callback<P>(self, base: P) -> CB::Output
90            where
91                P: Producer<Item = T>,
92            {
93                let producer = ChunkProducer::new(self.size, self.len, base, Vec::from_iter);
94                self.callback.callback(producer)
95            }
96        }
97    }
98}
99
100pub(super) struct ChunkProducer<P, F> {
101    chunk_size: usize,
102    len: usize,
103    base: P,
104    map: F,
105}
106
107impl<P, F> ChunkProducer<P, F> {
108    pub(super) fn new(chunk_size: usize, len: usize, base: P, map: F) -> Self {
109        Self {
110            chunk_size,
111            len,
112            base,
113            map,
114        }
115    }
116}
117
118impl<P, F, T> Producer for ChunkProducer<P, F>
119where
120    P: Producer,
121    F: Fn(P::IntoIter) -> T + Send + Clone,
122{
123    type Item = T;
124    type IntoIter = std::iter::Map<ChunkSeq<P>, F>;
125
126    fn into_iter(self) -> Self::IntoIter {
127        let chunks = ChunkSeq {
128            chunk_size: self.chunk_size,
129            len: self.len,
130            inner: if self.len > 0 { Some(self.base) } else { None },
131        };
132        chunks.map(self.map)
133    }
134
135    fn split_at(self, index: usize) -> (Self, Self) {
136        let elem_index = min(index * self.chunk_size, self.len);
137        let (left, right) = self.base.split_at(elem_index);
138        (
139            ChunkProducer {
140                chunk_size: self.chunk_size,
141                len: elem_index,
142                base: left,
143                map: self.map.clone(),
144            },
145            ChunkProducer {
146                chunk_size: self.chunk_size,
147                len: self.len - elem_index,
148                base: right,
149                map: self.map,
150            },
151        )
152    }
153
154    fn min_len(&self) -> usize {
155        div_round_up(self.base.min_len(), self.chunk_size)
156    }
157
158    fn max_len(&self) -> usize {
159        self.base.max_len() / self.chunk_size
160    }
161}
162
163pub(super) struct ChunkSeq<P> {
164    chunk_size: usize,
165    len: usize,
166    inner: Option<P>,
167}
168
169impl<P> Iterator for ChunkSeq<P>
170where
171    P: Producer,
172{
173    type Item = P::IntoIter;
174
175    fn next(&mut self) -> Option<Self::Item> {
176        let producer = self.inner.take()?;
177        if self.len > self.chunk_size {
178            let (left, right) = producer.split_at(self.chunk_size);
179            self.inner = Some(right);
180            self.len -= self.chunk_size;
181            Some(left.into_iter())
182        } else {
183            debug_assert!(self.len > 0);
184            self.len = 0;
185            Some(producer.into_iter())
186        }
187    }
188
189    fn size_hint(&self) -> (usize, Option<usize>) {
190        let len = self.len();
191        (len, Some(len))
192    }
193}
194
195impl<P> ExactSizeIterator for ChunkSeq<P>
196where
197    P: Producer,
198{
199    #[inline]
200    fn len(&self) -> usize {
201        div_round_up(self.len, self.chunk_size)
202    }
203}
204
205impl<P> DoubleEndedIterator for ChunkSeq<P>
206where
207    P: Producer,
208{
209    fn next_back(&mut self) -> Option<Self::Item> {
210        let producer = self.inner.take()?;
211        if self.len > self.chunk_size {
212            let mut size = self.len % self.chunk_size;
213            if size == 0 {
214                size = self.chunk_size;
215            }
216            let (left, right) = producer.split_at(self.len - size);
217            self.inner = Some(left);
218            self.len -= size;
219            Some(right.into_iter())
220        } else {
221            debug_assert!(self.len > 0);
222            self.len = 0;
223            Some(producer.into_iter())
224        }
225    }
226}