tokio_stream/
stream_map.rs

1use crate::Stream;
2
3use std::borrow::Borrow;
4use std::future::poll_fn;
5use std::hash::Hash;
6use std::pin::Pin;
7use std::task::{ready, Context, Poll};
8
9/// Combine many streams into one, indexing each source stream with a unique
10/// key.
11///
12/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
13/// streams into a single merged stream that yields values in the order that
14/// they arrive from the source streams. However, `StreamMap` has a lot more
15/// flexibility in usage patterns.
16///
17/// `StreamMap` can:
18///
19/// * Merge an arbitrary number of streams.
20/// * Track which source stream the value was received from.
21/// * Handle inserting and removing streams from the set of managed streams at
22///   any point during iteration.
23///
24/// All source streams held by `StreamMap` are indexed using a key. This key is
25/// included with the value when a source stream yields a value. The key is also
26/// used to remove the stream from the `StreamMap` before the stream has
27/// completed streaming.
28///
29/// # `Unpin`
30///
31/// Because the `StreamMap` API moves streams during runtime, both streams and
32/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
33/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
34/// pin the stream in the heap.
35///
36/// # Implementation
37///
38/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
39/// internal implementation detail will persist in future versions, but it is
40/// important to know the runtime implications. In general, `StreamMap` works
41/// best with a "smallish" number of streams as all entries are scanned on
42/// insert, remove, and polling. In cases where a large number of streams need
43/// to be merged, it may be advisable to use tasks sending values on a shared
44/// [`mpsc`] channel.
45///
46/// # Notes
47///
48/// `StreamMap` removes finished streams automatically, without alerting the user.
49/// In some scenarios, the caller would want to know on closed streams.
50/// To do this, use [`StreamNotifyClose`] as a wrapper to your stream.
51/// It will return None when the stream is closed.
52///
53/// [`StreamExt::merge`]: crate::StreamExt::merge
54/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
55/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
56/// [`Box::pin`]: std::boxed::Box::pin
57/// [`StreamNotifyClose`]: crate::StreamNotifyClose
58///
59/// # Examples
60///
61/// Merging two streams, then remove them after receiving the first value
62///
63/// ```
64/// use tokio_stream::{StreamExt, StreamMap, Stream};
65/// use tokio::sync::mpsc;
66/// use std::pin::Pin;
67///
68/// #[tokio::main]
69/// async fn main() {
70///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
71///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
72///
73///     // Convert the channels to a `Stream`.
74///     let rx1 = Box::pin(async_stream::stream! {
75///           while let Some(item) = rx1.recv().await {
76///               yield item;
77///           }
78///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
79///
80///     let rx2 = Box::pin(async_stream::stream! {
81///           while let Some(item) = rx2.recv().await {
82///               yield item;
83///           }
84///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
85///
86///     tokio::spawn(async move {
87///         tx1.send(1).await.unwrap();
88///
89///         // This value will never be received. The send may or may not return
90///         // `Err` depending on if the remote end closed first or not.
91///         let _ = tx1.send(2).await;
92///     });
93///
94///     tokio::spawn(async move {
95///         tx2.send(3).await.unwrap();
96///         let _ = tx2.send(4).await;
97///     });
98///
99///     let mut map = StreamMap::new();
100///
101///     // Insert both streams
102///     map.insert("one", rx1);
103///     map.insert("two", rx2);
104///
105///     // Read twice
106///     for _ in 0..2 {
107///         let (key, val) = map.next().await.unwrap();
108///
109///         if key == "one" {
110///             assert_eq!(val, 1);
111///         } else {
112///             assert_eq!(val, 3);
113///         }
114///
115///         // Remove the stream to prevent reading the next value
116///         map.remove(key);
117///     }
118/// }
119/// ```
120///
121/// This example models a read-only client to a chat system with channels. The
122/// client sends commands to join and leave channels. `StreamMap` is used to
123/// manage active channel subscriptions.
124///
125/// For simplicity, messages are displayed with `println!`, but they could be
126/// sent to the client over a socket.
127///
128/// ```no_run
129/// use tokio_stream::{Stream, StreamExt, StreamMap};
130///
131/// enum Command {
132///     Join(String),
133///     Leave(String),
134/// }
135///
136/// fn commands() -> impl Stream<Item = Command> {
137///     // Streams in user commands by parsing `stdin`.
138/// # tokio_stream::pending()
139/// }
140///
141/// // Join a channel, returns a stream of messages received on the channel.
142/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
143///     // left as an exercise to the reader
144/// # tokio_stream::pending()
145/// }
146///
147/// #[tokio::main]
148/// async fn main() {
149///     let mut channels = StreamMap::new();
150///
151///     // Input commands (join / leave channels).
152///     let cmds = commands();
153///     tokio::pin!(cmds);
154///
155///     loop {
156///         tokio::select! {
157///             Some(cmd) = cmds.next() => {
158///                 match cmd {
159///                     Command::Join(chan) => {
160///                         // Join the channel and add it to the `channels`
161///                         // stream map
162///                         let msgs = join(&chan);
163///                         channels.insert(chan, msgs);
164///                     }
165///                     Command::Leave(chan) => {
166///                         channels.remove(&chan);
167///                     }
168///                 }
169///             }
170///             Some((chan, msg)) = channels.next() => {
171///                 // Received a message, display it on stdout with the channel
172///                 // it originated from.
173///                 println!("{}: {}", chan, msg);
174///             }
175///             // Both the `commands` stream and the `channels` stream are
176///             // complete. There is no more work to do, so leave the loop.
177///             else => break,
178///         }
179///     }
180/// }
181/// ```
182///
183/// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
184///
185/// ```
186/// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
187///
188/// #[tokio::main]
189/// async fn main() {
190///     let mut map = StreamMap::new();
191///     let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
192///     let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
193///     map.insert(0, stream);
194///     map.insert(1, stream2);
195///     while let Some((key, val)) = map.next().await {
196///         match val {
197///             Some(val) => println!("got {val:?} from stream {key:?}"),
198///             None => println!("stream {key:?} closed"),
199///         }
200///     }
201/// }
202/// ```
203
204#[derive(Debug)]
205pub struct StreamMap<K, V> {
206    /// Streams stored in the map
207    entries: Vec<(K, V)>,
208}
209
210impl<K, V> StreamMap<K, V> {
211    /// An iterator visiting all key-value pairs in arbitrary order.
212    ///
213    /// The iterator element type is `&'a (K, V)`.
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// use tokio_stream::{StreamMap, pending};
219    ///
220    /// let mut map = StreamMap::new();
221    ///
222    /// map.insert("a", pending::<i32>());
223    /// map.insert("b", pending());
224    /// map.insert("c", pending());
225    ///
226    /// for (key, stream) in map.iter() {
227    ///     println!("({}, {:?})", key, stream);
228    /// }
229    /// ```
230    pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
231        self.entries.iter()
232    }
233
234    /// An iterator visiting all key-value pairs mutably in arbitrary order.
235    ///
236    /// The iterator element type is `&'a mut (K, V)`.
237    ///
238    /// # Examples
239    ///
240    /// ```
241    /// use tokio_stream::{StreamMap, pending};
242    ///
243    /// let mut map = StreamMap::new();
244    ///
245    /// map.insert("a", pending::<i32>());
246    /// map.insert("b", pending());
247    /// map.insert("c", pending());
248    ///
249    /// for (key, stream) in map.iter_mut() {
250    ///     println!("({}, {:?})", key, stream);
251    /// }
252    /// ```
253    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
254        self.entries.iter_mut()
255    }
256
257    /// Creates an empty `StreamMap`.
258    ///
259    /// The stream map is initially created with a capacity of `0`, so it will
260    /// not allocate until it is first inserted into.
261    ///
262    /// # Examples
263    ///
264    /// ```
265    /// use tokio_stream::{StreamMap, Pending};
266    ///
267    /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
268    /// ```
269    pub fn new() -> StreamMap<K, V> {
270        StreamMap { entries: vec![] }
271    }
272
273    /// Creates an empty `StreamMap` with the specified capacity.
274    ///
275    /// The stream map will be able to hold at least `capacity` elements without
276    /// reallocating. If `capacity` is 0, the stream map will not allocate.
277    ///
278    /// # Examples
279    ///
280    /// ```
281    /// use tokio_stream::{StreamMap, Pending};
282    ///
283    /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
284    /// ```
285    pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
286        StreamMap {
287            entries: Vec::with_capacity(capacity),
288        }
289    }
290
291    /// Returns an iterator visiting all keys in arbitrary order.
292    ///
293    /// The iterator element type is `&'a K`.
294    ///
295    /// # Examples
296    ///
297    /// ```
298    /// use tokio_stream::{StreamMap, pending};
299    ///
300    /// let mut map = StreamMap::new();
301    ///
302    /// map.insert("a", pending::<i32>());
303    /// map.insert("b", pending());
304    /// map.insert("c", pending());
305    ///
306    /// for key in map.keys() {
307    ///     println!("{}", key);
308    /// }
309    /// ```
310    pub fn keys(&self) -> impl Iterator<Item = &K> {
311        self.iter().map(|(k, _)| k)
312    }
313
314    /// An iterator visiting all values in arbitrary order.
315    ///
316    /// The iterator element type is `&'a V`.
317    ///
318    /// # Examples
319    ///
320    /// ```
321    /// use tokio_stream::{StreamMap, pending};
322    ///
323    /// let mut map = StreamMap::new();
324    ///
325    /// map.insert("a", pending::<i32>());
326    /// map.insert("b", pending());
327    /// map.insert("c", pending());
328    ///
329    /// for stream in map.values() {
330    ///     println!("{:?}", stream);
331    /// }
332    /// ```
333    pub fn values(&self) -> impl Iterator<Item = &V> {
334        self.iter().map(|(_, v)| v)
335    }
336
337    /// An iterator visiting all values mutably in arbitrary order.
338    ///
339    /// The iterator element type is `&'a mut V`.
340    ///
341    /// # Examples
342    ///
343    /// ```
344    /// use tokio_stream::{StreamMap, pending};
345    ///
346    /// let mut map = StreamMap::new();
347    ///
348    /// map.insert("a", pending::<i32>());
349    /// map.insert("b", pending());
350    /// map.insert("c", pending());
351    ///
352    /// for stream in map.values_mut() {
353    ///     println!("{:?}", stream);
354    /// }
355    /// ```
356    pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
357        self.iter_mut().map(|(_, v)| v)
358    }
359
360    /// Returns the number of streams the map can hold without reallocating.
361    ///
362    /// This number is a lower bound; the `StreamMap` might be able to hold
363    /// more, but is guaranteed to be able to hold at least this many.
364    ///
365    /// # Examples
366    ///
367    /// ```
368    /// use tokio_stream::{StreamMap, Pending};
369    ///
370    /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
371    /// assert!(map.capacity() >= 100);
372    /// ```
373    pub fn capacity(&self) -> usize {
374        self.entries.capacity()
375    }
376
377    /// Returns the number of streams in the map.
378    ///
379    /// # Examples
380    ///
381    /// ```
382    /// use tokio_stream::{StreamMap, pending};
383    ///
384    /// let mut a = StreamMap::new();
385    /// assert_eq!(a.len(), 0);
386    /// a.insert(1, pending::<i32>());
387    /// assert_eq!(a.len(), 1);
388    /// ```
389    pub fn len(&self) -> usize {
390        self.entries.len()
391    }
392
393    /// Returns `true` if the map contains no elements.
394    ///
395    /// # Examples
396    ///
397    /// ```
398    /// use tokio_stream::{StreamMap, pending};
399    ///
400    /// let mut a = StreamMap::new();
401    /// assert!(a.is_empty());
402    /// a.insert(1, pending::<i32>());
403    /// assert!(!a.is_empty());
404    /// ```
405    pub fn is_empty(&self) -> bool {
406        self.entries.is_empty()
407    }
408
409    /// Clears the map, removing all key-stream pairs. Keeps the allocated
410    /// memory for reuse.
411    ///
412    /// # Examples
413    ///
414    /// ```
415    /// use tokio_stream::{StreamMap, pending};
416    ///
417    /// let mut a = StreamMap::new();
418    /// a.insert(1, pending::<i32>());
419    /// a.clear();
420    /// assert!(a.is_empty());
421    /// ```
422    pub fn clear(&mut self) {
423        self.entries.clear();
424    }
425
426    /// Insert a key-stream pair into the map.
427    ///
428    /// If the map did not have this key present, `None` is returned.
429    ///
430    /// If the map did have this key present, the new `stream` replaces the old
431    /// one and the old stream is returned.
432    ///
433    /// # Examples
434    ///
435    /// ```
436    /// use tokio_stream::{StreamMap, pending};
437    ///
438    /// let mut map = StreamMap::new();
439    ///
440    /// assert!(map.insert(37, pending::<i32>()).is_none());
441    /// assert!(!map.is_empty());
442    ///
443    /// map.insert(37, pending());
444    /// assert!(map.insert(37, pending()).is_some());
445    /// ```
446    pub fn insert(&mut self, k: K, stream: V) -> Option<V>
447    where
448        K: Hash + Eq,
449    {
450        let ret = self.remove(&k);
451        self.entries.push((k, stream));
452
453        ret
454    }
455
456    /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
457    ///
458    /// The key may be any borrowed form of the map's key type, but `Hash` and
459    /// `Eq` on the borrowed form must match those for the key type.
460    ///
461    /// # Examples
462    ///
463    /// ```
464    /// use tokio_stream::{StreamMap, pending};
465    ///
466    /// let mut map = StreamMap::new();
467    /// map.insert(1, pending::<i32>());
468    /// assert!(map.remove(&1).is_some());
469    /// assert!(map.remove(&1).is_none());
470    /// ```
471    pub fn remove<Q>(&mut self, k: &Q) -> Option<V>
472    where
473        K: Borrow<Q>,
474        Q: Hash + Eq + ?Sized,
475    {
476        for i in 0..self.entries.len() {
477            if self.entries[i].0.borrow() == k {
478                return Some(self.entries.swap_remove(i).1);
479            }
480        }
481
482        None
483    }
484
485    /// Returns `true` if the map contains a stream for the specified key.
486    ///
487    /// The key may be any borrowed form of the map's key type, but `Hash` and
488    /// `Eq` on the borrowed form must match those for the key type.
489    ///
490    /// # Examples
491    ///
492    /// ```
493    /// use tokio_stream::{StreamMap, pending};
494    ///
495    /// let mut map = StreamMap::new();
496    /// map.insert(1, pending::<i32>());
497    /// assert_eq!(map.contains_key(&1), true);
498    /// assert_eq!(map.contains_key(&2), false);
499    /// ```
500    pub fn contains_key<Q>(&self, k: &Q) -> bool
501    where
502        K: Borrow<Q>,
503        Q: Hash + Eq + ?Sized,
504    {
505        for i in 0..self.entries.len() {
506            if self.entries[i].0.borrow() == k {
507                return true;
508            }
509        }
510
511        false
512    }
513}
514
515impl<K, V> StreamMap<K, V>
516where
517    K: Unpin,
518    V: Stream + Unpin,
519{
520    /// Polls the next value, includes the vec entry index
521    fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
522        let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
523        let mut idx = start;
524
525        for _ in 0..self.entries.len() {
526            let (_, stream) = &mut self.entries[idx];
527
528            match Pin::new(stream).poll_next(cx) {
529                Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))),
530                Poll::Ready(None) => {
531                    // Remove the entry
532                    self.entries.swap_remove(idx);
533
534                    // Check if this was the last entry, if so the cursor needs
535                    // to wrap
536                    if idx == self.entries.len() {
537                        idx = 0;
538                    } else if idx < start && start <= self.entries.len() {
539                        // The stream being swapped into the current index has
540                        // already been polled, so skip it.
541                        idx = idx.wrapping_add(1) % self.entries.len();
542                    }
543                }
544                Poll::Pending => {
545                    idx = idx.wrapping_add(1) % self.entries.len();
546                }
547            }
548        }
549
550        // If the map is empty, then the stream is complete.
551        if self.entries.is_empty() {
552            Poll::Ready(None)
553        } else {
554            Poll::Pending
555        }
556    }
557}
558
559impl<K, V> Default for StreamMap<K, V> {
560    fn default() -> Self {
561        Self::new()
562    }
563}
564
565impl<K, V> StreamMap<K, V>
566where
567    K: Clone + Unpin,
568    V: Stream + Unpin,
569{
570    /// Receives multiple items on this [`StreamMap`], extending the provided `buffer`.
571    ///
572    /// This method returns the number of items that is appended to the `buffer`.
573    ///
574    /// Note that this method does not guarantee that exactly `limit` items
575    /// are received. Rather, if at least one item is available, it returns
576    /// as many items as it can up to the given limit. This method returns
577    /// zero only if the `StreamMap` is empty (or if `limit` is zero).
578    ///
579    /// # Cancel safety
580    ///
581    /// This method is cancel safe. If `next_many` is used as the event in a
582    /// [`tokio::select!`](tokio::select) statement and some other branch
583    /// completes first, it is guaranteed that no items were received on any of
584    /// the underlying streams.
585    pub async fn next_many(&mut self, buffer: &mut Vec<(K, V::Item)>, limit: usize) -> usize {
586        poll_fn(|cx| self.poll_next_many(cx, buffer, limit)).await
587    }
588
589    /// Polls to receive multiple items on this `StreamMap`, extending the provided `buffer`.
590    ///
591    /// This method returns:
592    /// * `Poll::Pending` if no items are available but the `StreamMap` is not empty.
593    /// * `Poll::Ready(count)` where `count` is the number of items successfully received and
594    ///   stored in `buffer`. This can be less than, or equal to, `limit`.
595    /// * `Poll::Ready(0)` if `limit` is set to zero or when the `StreamMap` is empty.
596    ///
597    /// Note that this method does not guarantee that exactly `limit` items
598    /// are received. Rather, if at least one item is available, it returns
599    /// as many items as it can up to the given limit. This method returns
600    /// zero only if the `StreamMap` is empty (or if `limit` is zero).
601    pub fn poll_next_many(
602        &mut self,
603        cx: &mut Context<'_>,
604        buffer: &mut Vec<(K, V::Item)>,
605        limit: usize,
606    ) -> Poll<usize> {
607        if limit == 0 || self.entries.is_empty() {
608            return Poll::Ready(0);
609        }
610
611        let mut added = 0;
612
613        let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
614        let mut idx = start;
615
616        while added < limit {
617            // Indicates whether at least one stream returned a value when polled or not
618            let mut should_loop = false;
619
620            for _ in 0..self.entries.len() {
621                let (_, stream) = &mut self.entries[idx];
622
623                match Pin::new(stream).poll_next(cx) {
624                    Poll::Ready(Some(val)) => {
625                        added += 1;
626
627                        let key = self.entries[idx].0.clone();
628                        buffer.push((key, val));
629
630                        should_loop = true;
631
632                        idx = idx.wrapping_add(1) % self.entries.len();
633                    }
634                    Poll::Ready(None) => {
635                        // Remove the entry
636                        self.entries.swap_remove(idx);
637
638                        // Check if this was the last entry, if so the cursor needs
639                        // to wrap
640                        if idx == self.entries.len() {
641                            idx = 0;
642                        } else if idx < start && start <= self.entries.len() {
643                            // The stream being swapped into the current index has
644                            // already been polled, so skip it.
645                            idx = idx.wrapping_add(1) % self.entries.len();
646                        }
647                    }
648                    Poll::Pending => {
649                        idx = idx.wrapping_add(1) % self.entries.len();
650                    }
651                }
652            }
653
654            if !should_loop {
655                break;
656            }
657        }
658
659        if added > 0 {
660            Poll::Ready(added)
661        } else if self.entries.is_empty() {
662            Poll::Ready(0)
663        } else {
664            Poll::Pending
665        }
666    }
667}
668
669impl<K, V> Stream for StreamMap<K, V>
670where
671    K: Clone + Unpin,
672    V: Stream + Unpin,
673{
674    type Item = (K, V::Item);
675
676    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
677        if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
678            let key = self.entries[idx].0.clone();
679            Poll::Ready(Some((key, val)))
680        } else {
681            Poll::Ready(None)
682        }
683    }
684
685    fn size_hint(&self) -> (usize, Option<usize>) {
686        let mut ret = (0, Some(0));
687
688        for (_, stream) in &self.entries {
689            let hint = stream.size_hint();
690
691            ret.0 += hint.0;
692
693            match (ret.1, hint.1) {
694                (Some(a), Some(b)) => ret.1 = Some(a + b),
695                (Some(_), None) => ret.1 = None,
696                _ => {}
697            }
698        }
699
700        ret
701    }
702}
703
704impl<K, V> FromIterator<(K, V)> for StreamMap<K, V>
705where
706    K: Hash + Eq,
707{
708    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
709        let iterator = iter.into_iter();
710        let (lower_bound, _) = iterator.size_hint();
711        let mut stream_map = Self::with_capacity(lower_bound);
712
713        for (key, value) in iterator {
714            stream_map.insert(key, value);
715        }
716
717        stream_map
718    }
719}
720
721impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
722    fn extend<T>(&mut self, iter: T)
723    where
724        T: IntoIterator<Item = (K, V)>,
725    {
726        self.entries.extend(iter);
727    }
728}
729
730mod rand {
731    use std::cell::Cell;
732
733    mod loom {
734        #[cfg(not(loom))]
735        pub(crate) mod rand {
736            use std::collections::hash_map::RandomState;
737            use std::hash::{BuildHasher, Hash, Hasher};
738            use std::sync::atomic::AtomicU32;
739            use std::sync::atomic::Ordering::Relaxed;
740
741            static COUNTER: AtomicU32 = AtomicU32::new(1);
742
743            pub(crate) fn seed() -> u64 {
744                let rand_state = RandomState::new();
745
746                let mut hasher = rand_state.build_hasher();
747
748                // Hash some unique-ish data to generate some new state
749                COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
750
751                // Get the seed
752                hasher.finish()
753            }
754        }
755
756        #[cfg(loom)]
757        pub(crate) mod rand {
758            pub(crate) fn seed() -> u64 {
759                1
760            }
761        }
762    }
763
764    /// Fast random number generate
765    ///
766    /// Implement `xorshift64+`: 2 32-bit `xorshift` sequences added together.
767    /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
768    /// `Xorshift` paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
769    /// This generator passes the SmallCrush suite, part of TestU01 framework:
770    /// <http://simul.iro.umontreal.ca/testu01/tu01.html>
771    #[derive(Debug)]
772    pub(crate) struct FastRand {
773        one: Cell<u32>,
774        two: Cell<u32>,
775    }
776
777    impl FastRand {
778        /// Initialize a new, thread-local, fast random number generator.
779        pub(crate) fn new(seed: u64) -> FastRand {
780            let one = (seed >> 32) as u32;
781            let mut two = seed as u32;
782
783            if two == 0 {
784                // This value cannot be zero
785                two = 1;
786            }
787
788            FastRand {
789                one: Cell::new(one),
790                two: Cell::new(two),
791            }
792        }
793
794        pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
795            // This is similar to fastrand() % n, but faster.
796            // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
797            let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
798            (mul >> 32) as u32
799        }
800
801        fn fastrand(&self) -> u32 {
802            let mut s1 = self.one.get();
803            let s0 = self.two.get();
804
805            s1 ^= s1 << 17;
806            s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
807
808            self.one.set(s0);
809            self.two.set(s1);
810
811            s0.wrapping_add(s1)
812        }
813    }
814
815    // Used by `StreamMap`
816    pub(crate) fn thread_rng_n(n: u32) -> u32 {
817        thread_local! {
818            static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
819        }
820
821        THREAD_RNG.with(|rng| rng.fastrand_n(n))
822    }
823}