futures_util/stream/
futures_unordered.rs

1//! An unbounded set of futures.
2
3use std::cell::UnsafeCell;
4use std::fmt::{self, Debug};
5use std::iter::FromIterator;
6use std::marker::PhantomData;
7use std::mem;
8use std::ptr;
9use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
10use std::sync::atomic::{AtomicPtr, AtomicBool};
11use std::sync::{Arc, Weak};
12use std::usize;
13
14use futures_core::{Stream, Future, Poll, Async, IntoFuture};
15use futures_core::task::{self, AtomicWaker, UnsafeWake, Waker};
16
17/// A set of `Future`s which may complete in any order.
18///
19/// This structure is optimized to manage a large number of futures.
20/// Futures managed by `FuturesUnordered` will only be polled when they
21/// generate notifications. This reduces the required amount of work needed to
22/// poll large numbers of futures.
23///
24/// `FuturesUnordered` can be filled by `collect`ing an iterator of `Future`s
25/// into a `FuturesUnordered`, or by `push`ing `Future`s onto an existing
26/// `FuturesUnordered`. When new `Future`s are added, `poll_next` must be
27/// called in order to begin receiving wakeups for new `Future`s.
28///
29/// Note that you can create a ready-made `FuturesUnordered` via the
30/// `futures_unordered` function in the `stream` module, or you can start with an
31/// empty set with the `FuturesUnordered::new` constructor.
32#[must_use = "streams do nothing unless polled"]
33pub struct FuturesUnordered<F> {
34    inner: Arc<Inner<F>>,
35    len: usize,
36    head_all: *const Node<F>,
37}
38
39unsafe impl<T: Send> Send for FuturesUnordered<T> {}
40unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}
41
42// FuturesUnordered is implemented using two linked lists. One which links all
43// futures managed by a `FuturesUnordered` and one that tracks futures that have
44// been scheduled for polling. The first linked list is not thread safe and is
45// only accessed by the thread that owns the `FuturesUnordered` value. The
46// second linked list is an implementation of the intrusive MPSC queue algorithm
47// described by 1024cores.net.
48//
49// When a future is submitted to the set a node is allocated and inserted in
50// both linked lists. The next call to `poll` will (eventually) see this node
51// and call `poll` on the future.
52//
53// Before a managed future is polled, the current task's `Wake` is replaced
54// with one that is aware of the specific future being run. This ensures that
55// task notifications generated by that specific future are visible to
56// `FuturesUnordered`. When a notification is received, the node is scheduled
57// for polling by being inserted into the concurrent linked list.
58//
59// Each node uses an `AtomicUsize` to track it's state. The node state is the
60// reference count (the number of outstanding handles to the node) as well as a
61// flag tracking if the node is currently inserted in the atomic queue. When the
62// future is notified, it will only insert itself into the linked list if it
63// isn't currently inserted.
64
65struct Inner<T> {
66    // The task using `FuturesUnordered`.
67    parent: AtomicWaker,
68
69    // Head/tail of the readiness queue
70    head_readiness: AtomicPtr<Node<T>>,
71    tail_readiness: UnsafeCell<*const Node<T>>,
72    stub: Arc<Node<T>>,
73}
74
75struct Node<T> {
76    // The future
77    future: UnsafeCell<Option<T>>,
78
79    // Next pointer for linked list tracking all active nodes
80    next_all: UnsafeCell<*const Node<T>>,
81
82    // Previous node in linked list tracking all active nodes
83    prev_all: UnsafeCell<*const Node<T>>,
84
85    // Next pointer in readiness queue
86    next_readiness: AtomicPtr<Node<T>>,
87
88    // Queue that we'll be enqueued to when notified
89    queue: Weak<Inner<T>>,
90
91    // Whether or not this node is currently in the mpsc queue.
92    queued: AtomicBool,
93}
94
95enum Dequeue<T> {
96    Data(*const Node<T>),
97    Empty,
98    Inconsistent,
99}
100
101impl<T> FuturesUnordered<T>
102    where T: Future,
103{
104    /// Constructs a new, empty `FuturesUnordered`
105    ///
106    /// The returned `FuturesUnordered` does not contain any futures and, in this
107    /// state, `FuturesUnordered::poll_next` will return `Ok(Async::Ready(None))`.
108    pub fn new() -> FuturesUnordered<T> {
109        let stub = Arc::new(Node {
110            future: UnsafeCell::new(None),
111            next_all: UnsafeCell::new(ptr::null()),
112            prev_all: UnsafeCell::new(ptr::null()),
113            next_readiness: AtomicPtr::new(ptr::null_mut()),
114            queued: AtomicBool::new(true),
115            queue: Weak::new(),
116        });
117        let stub_ptr = &*stub as *const Node<T>;
118        let inner = Arc::new(Inner {
119            parent: AtomicWaker::new(),
120            head_readiness: AtomicPtr::new(stub_ptr as *mut _),
121            tail_readiness: UnsafeCell::new(stub_ptr),
122            stub: stub,
123        });
124
125        FuturesUnordered {
126            len: 0,
127            head_all: ptr::null_mut(),
128            inner: inner,
129        }
130    }
131}
132
133impl<T> FuturesUnordered<T> {
134    /// Returns the number of futures contained in the set.
135    ///
136    /// This represents the total number of in-flight futures.
137    pub fn len(&self) -> usize {
138        self.len
139    }
140
141    /// Returns `true` if the set contains no futures
142    pub fn is_empty(&self) -> bool {
143        self.len == 0
144    }
145
146    /// Push a future into the set.
147    ///
148    /// This function submits the given future to the set for managing. This
149    /// function will not call `poll` on the submitted future. The caller must
150    /// ensure that `FuturesUnordered::poll_next` is called in order to receive task
151    /// notifications.
152    pub fn push(&mut self, future: T) {
153        let node = Arc::new(Node {
154            future: UnsafeCell::new(Some(future)),
155            next_all: UnsafeCell::new(ptr::null_mut()),
156            prev_all: UnsafeCell::new(ptr::null_mut()),
157            next_readiness: AtomicPtr::new(ptr::null_mut()),
158            queued: AtomicBool::new(true),
159            queue: Arc::downgrade(&self.inner),
160        });
161
162        // Right now our node has a strong reference count of 1. We transfer
163        // ownership of this reference count to our internal linked list
164        // and we'll reclaim ownership through the `unlink` function below.
165        let ptr = self.link(node);
166
167        // We'll need to get the future "into the system" to start tracking it,
168        // e.g. getting its unpark notifications going to us tracking which
169        // futures are ready. To do that we unconditionally enqueue it for
170        // polling here.
171        self.inner.enqueue(ptr);
172    }
173
174    /// Returns an iterator that allows modifying each future in the set.
175    pub fn iter_mut(&mut self) -> IterMut<T> {
176        IterMut {
177            node: self.head_all,
178            len: self.len,
179            _marker: PhantomData
180        }
181    }
182
183    fn release_node(&mut self, node: Arc<Node<T>>) {
184        // The future is done, try to reset the queued flag. This will prevent
185        // `notify` from doing any work in the future
186        let prev = node.queued.swap(true, SeqCst);
187
188        // Drop the future, even if it hasn't finished yet. This is safe
189        // because we're dropping the future on the thread that owns
190        // `FuturesUnordered`, which correctly tracks T's lifetimes and such.
191        unsafe {
192            drop((*node.future.get()).take());
193        }
194
195        // If the queued flag was previously set then it means that this node
196        // is still in our internal mpsc queue. We then transfer ownership
197        // of our reference count to the mpsc queue, and it'll come along and
198        // free it later, noticing that the future is `None`.
199        //
200        // If, however, the queued flag was *not* set then we're safe to
201        // release our reference count on the internal node. The queued flag
202        // was set above so all future `enqueue` operations will not actually
203        // enqueue the node, so our node will never see the mpsc queue again.
204        // The node itself will be deallocated once all reference counts have
205        // been dropped by the various owning tasks elsewhere.
206        if prev {
207            mem::forget(node);
208        }
209    }
210
211    /// Insert a new node into the internal linked list.
212    fn link(&mut self, node: Arc<Node<T>>) -> *const Node<T> {
213        let ptr = Arc::into_raw(node);
214        unsafe {
215            *(*ptr).next_all.get() = self.head_all;
216            if !self.head_all.is_null() {
217                *(*self.head_all).prev_all.get() = ptr;
218            }
219        }
220
221        self.head_all = ptr;
222        self.len += 1;
223        ptr
224    }
225
226    /// Remove the node from the linked list tracking all nodes currently
227    /// managed by `FuturesUnordered`.
228    unsafe fn unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>> {
229        let node = Arc::from_raw(node);
230        let next = *node.next_all.get();
231        let prev = *node.prev_all.get();
232        *node.next_all.get() = ptr::null_mut();
233        *node.prev_all.get() = ptr::null_mut();
234
235        if !next.is_null() {
236            *(*next).prev_all.get() = prev;
237        }
238
239        if !prev.is_null() {
240            *(*prev).next_all.get() = next;
241        } else {
242            self.head_all = next;
243        }
244        self.len -= 1;
245        node
246    }
247}
248
249impl<T> Stream for FuturesUnordered<T>
250    where T: Future
251{
252    type Item = T::Item;
253    type Error = T::Error;
254
255    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<T::Item>, T::Error> {
256        // Ensure `parent` is correctly set.
257        self.inner.parent.register(cx.waker());
258
259        loop {
260            let node = match unsafe { self.inner.dequeue() } {
261                Dequeue::Empty => {
262                    if self.is_empty() {
263                        return Ok(Async::Ready(None));
264                    } else {
265                        return Ok(Async::Pending)
266                    }
267                }
268                Dequeue::Inconsistent => {
269                    // At this point, it may be worth yielding the thread &
270                    // spinning a few times... but for now, just yield using the
271                    // task system.
272                    cx.waker().wake();
273                    return Ok(Async::Pending);
274                }
275                Dequeue::Data(node) => node,
276            };
277
278            debug_assert!(node != self.inner.stub());
279
280            unsafe {
281                let mut future = match (*(*node).future.get()).take() {
282                    Some(future) => future,
283
284                    // If the future has already gone away then we're just
285                    // cleaning out this node. See the comment in
286                    // `release_node` for more information, but we're basically
287                    // just taking ownership of our reference count here.
288                    None => {
289                        let node = Arc::from_raw(node);
290                        assert!((*node.next_all.get()).is_null());
291                        assert!((*node.prev_all.get()).is_null());
292                        continue
293                    }
294                };
295
296                // Unset queued flag... this must be done before
297                // polling. This ensures that the future gets
298                // rescheduled if it is notified **during** a call
299                // to `poll`.
300                let prev = (*node).queued.swap(false, SeqCst);
301                assert!(prev);
302
303                // We're going to need to be very careful if the `poll`
304                // function below panics. We need to (a) not leak memory and
305                // (b) ensure that we still don't have any use-after-frees. To
306                // manage this we do a few things:
307                //
308                // * This "bomb" here will call `release_node` if dropped
309                //   abnormally. That way we'll be sure the memory management
310                //   of the `node` is managed correctly.
311                // * The future was extracted above (taken ownership). That way
312                //   if it panics we're guaranteed that the future is
313                //   dropped on this thread and doesn't accidentally get
314                //   dropped on a different thread (bad).
315                // * We unlink the node from our internal queue to preemptively
316                //   assume it'll panic, in which case we'll want to discard it
317                //   regardless.
318                struct Bomb<'a, T: 'a> {
319                    queue: &'a mut FuturesUnordered<T>,
320                    node: Option<Arc<Node<T>>>,
321                }
322                impl<'a, T> Drop for Bomb<'a, T> {
323                    fn drop(&mut self) {
324                        if let Some(node) = self.node.take() {
325                            self.queue.release_node(node);
326                        }
327                    }
328                }
329                let mut bomb = Bomb {
330                    node: Some(self.unlink(node)),
331                    queue: self,
332                };
333
334                // Poll the underlying future with the appropriate `notify`
335                // implementation. This is where a large bit of the unsafety
336                // starts to stem from internally. The `notify` instance itself
337                // is basically just our `Arc<Node<T>>` and tracks the mpsc
338                // queue of ready futures.
339                //
340                // Critically though `Node<T>` won't actually access `T`, the
341                // future, while it's floating around inside of `Task`
342                // instances. These structs will basically just use `T` to size
343                // the internal allocation, appropriately accessing fields and
344                // deallocating the node if need be.
345                let res = {
346                    let notify = NodeToHandle(bomb.node.as_ref().unwrap());
347                    let waker = Waker::from(notify);
348                    let mut cx = cx.with_waker(&waker);
349                    future.poll(&mut cx)
350                };
351
352                let ret = match res {
353                    Ok(Async::Pending) => {
354                        let node = bomb.node.take().unwrap();
355                        *node.future.get() = Some(future);
356                        bomb.queue.link(node);
357                        continue
358                    }
359                    Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
360                    Err(e) => Err(e),
361                };
362                return ret
363            }
364        }
365    }
366}
367
368impl<T: Debug> Debug for FuturesUnordered<T> {
369    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
370        write!(fmt, "FuturesUnordered {{ ... }}")
371    }
372}
373
374impl<T> Drop for FuturesUnordered<T> {
375    fn drop(&mut self) {
376        // When a `FuturesUnordered` is dropped we want to drop all futures associated
377        // with it. At the same time though there may be tons of `Task` handles
378        // flying around which contain `Node<T>` references inside them. We'll
379        // let those naturally get deallocated when the `Task` itself goes out
380        // of scope or gets notified.
381        unsafe {
382            while !self.head_all.is_null() {
383                let head = self.head_all;
384                let node = self.unlink(head);
385                self.release_node(node);
386            }
387        }
388
389        // Note that at this point we could still have a bunch of nodes in the
390        // mpsc queue. None of those nodes, however, have futures associated
391        // with them so they're safe to destroy on any thread. At this point
392        // the `FuturesUnordered` struct, the owner of the one strong reference
393        // to `Inner<T>` will drop the strong reference. At that point
394        // whichever thread releases the strong refcount last (be it this
395        // thread or some other thread as part of an `upgrade`) will clear out
396        // the mpsc queue and free all remaining nodes.
397        //
398        // While that freeing operation isn't guaranteed to happen here, it's
399        // guaranteed to happen "promptly" as no more "blocking work" will
400        // happen while there's a strong refcount held.
401    }
402}
403
404impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
405    fn from_iter<T>(iter: T) -> Self
406    where
407        T: IntoIterator<Item = F>,
408    {
409        let acc = FuturesUnordered::new();
410        iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc })
411    }
412}
413
414#[derive(Debug)]
415/// Mutable iterator over all futures in the unordered set.
416pub struct IterMut<'a, F: 'a> {
417    node: *const Node<F>,
418    len: usize,
419    _marker: PhantomData<&'a mut FuturesUnordered<F>>
420}
421
422impl<'a, F> Iterator for IterMut<'a, F> {
423    type Item = &'a mut F;
424
425    fn next(&mut self) -> Option<&'a mut F> {
426        if self.node.is_null() {
427            return None;
428        }
429        unsafe {
430            let future = (*(*self.node).future.get()).as_mut().unwrap();
431            let next = *(*self.node).next_all.get();
432            self.node = next;
433            self.len -= 1;
434            Some(future)
435        }
436    }
437
438    fn size_hint(&self) -> (usize, Option<usize>) {
439        (self.len, Some(self.len))
440    }
441}
442
443impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
444
445impl<T> Inner<T> {
446    /// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
447    fn enqueue(&self, node: *const Node<T>) {
448        unsafe {
449            debug_assert!((*node).queued.load(Relaxed));
450
451            // This action does not require any coordination
452            (*node).next_readiness.store(ptr::null_mut(), Relaxed);
453
454            // Note that these atomic orderings come from 1024cores
455            let node = node as *mut _;
456            let prev = self.head_readiness.swap(node, AcqRel);
457            (*prev).next_readiness.store(node, Release);
458        }
459    }
460
461    /// The dequeue function from the 1024cores intrusive MPSC queue algorithm
462    ///
463    /// Note that this unsafe as it required mutual exclusion (only one thread
464    /// can call this) to be guaranteed elsewhere.
465    unsafe fn dequeue(&self) -> Dequeue<T> {
466        let mut tail = *self.tail_readiness.get();
467        let mut next = (*tail).next_readiness.load(Acquire);
468
469        if tail == self.stub() {
470            if next.is_null() {
471                return Dequeue::Empty;
472            }
473
474            *self.tail_readiness.get() = next;
475            tail = next;
476            next = (*next).next_readiness.load(Acquire);
477        }
478
479        if !next.is_null() {
480            *self.tail_readiness.get() = next;
481            debug_assert!(tail != self.stub());
482            return Dequeue::Data(tail);
483        }
484
485        if self.head_readiness.load(Acquire) as *const _ != tail {
486            return Dequeue::Inconsistent;
487        }
488
489        self.enqueue(self.stub());
490
491        next = (*tail).next_readiness.load(Acquire);
492
493        if !next.is_null() {
494            *self.tail_readiness.get() = next;
495            return Dequeue::Data(tail);
496        }
497
498        Dequeue::Inconsistent
499    }
500
501    fn stub(&self) -> *const Node<T> {
502        &*self.stub
503    }
504}
505
506impl<T> Drop for Inner<T> {
507    fn drop(&mut self) {
508        // Once we're in the destructor for `Inner<T>` we need to clear out the
509        // mpsc queue of nodes if there's anything left in there.
510        //
511        // Note that each node has a strong reference count associated with it
512        // which is owned by the mpsc queue. All nodes should have had their
513        // futures dropped already by the `FuturesUnordered` destructor above,
514        // so we're just pulling out nodes and dropping their refcounts.
515        unsafe {
516            loop {
517                match self.dequeue() {
518                    Dequeue::Empty => break,
519                    Dequeue::Inconsistent => abort("inconsistent in drop"),
520                    Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
521                }
522            }
523        }
524    }
525}
526
527#[allow(missing_debug_implementations)]
528struct NodeToHandle<'a, T: 'a>(&'a Arc<Node<T>>);
529
530impl<'a, T> Clone for NodeToHandle<'a, T> {
531    fn clone(&self) -> Self {
532        NodeToHandle(self.0)
533    }
534}
535
536#[doc(hidden)]
537impl<'a, T> From<NodeToHandle<'a, T>> for Waker {
538    fn from(handle: NodeToHandle<'a, T>) -> Waker {
539        unsafe {
540            let ptr = handle.0.clone();
541            let ptr = mem::transmute::<Arc<Node<T>>, *const ArcNode<T>>(ptr);
542            Waker::new(hide_lt(ptr))
543        }
544    }
545}
546
547struct ArcNode<T>(PhantomData<T>);
548
549// We should never touch `T` on any thread other than the one owning
550// `FuturesUnordered`, so this should be a safe operation.
551unsafe impl<T> Send for ArcNode<T> {}
552unsafe impl<T> Sync for ArcNode<T> {}
553
554unsafe impl<T> UnsafeWake for ArcNode<T> {
555    unsafe fn clone_raw(&self) -> Waker {
556        let me: *const ArcNode<T> = self;
557        let me: *const *const ArcNode<T> = &me;
558        let me = &*(me as *const Arc<Node<T>>);
559        NodeToHandle(me).into()
560    }
561
562    unsafe fn drop_raw(&self) {
563        let mut me: *const ArcNode<T> = self;
564        let me = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>;
565        ptr::drop_in_place(me);
566    }
567
568    unsafe fn wake(&self) {
569        let me: *const ArcNode<T> = self;
570        let me: *const *const ArcNode<T> = &me;
571        let me = me as *const Arc<Node<T>>;
572        Node::notify(&*me)
573    }
574}
575
576unsafe fn hide_lt<T>(p: *const ArcNode<T>) -> *const UnsafeWake {
577    mem::transmute(p as *const UnsafeWake)
578}
579
580impl<T> Node<T> {
581    fn notify(me: &Arc<Node<T>>) {
582        let inner = match me.queue.upgrade() {
583            Some(inner) => inner,
584            None => return,
585        };
586
587        // It's our job to notify the node that it's ready to get polled,
588        // meaning that we need to enqueue it into the readiness queue. To
589        // do this we flag that we're ready to be queued, and if successful
590        // we then do the literal queueing operation, ensuring that we're
591        // only queued once.
592        //
593        // Once the node is inserted we be sure to notify the parent task,
594        // as it'll want to come along and pick up our node now.
595        //
596        // Note that we don't change the reference count of the node here,
597        // we're just enqueueing the raw pointer. The `FuturesUnordered`
598        // implementation guarantees that if we set the `queued` flag true that
599        // there's a reference count held by the main `FuturesUnordered` queue
600        // still.
601        let prev = me.queued.swap(true, SeqCst);
602        if !prev {
603            inner.enqueue(&**me);
604            inner.parent.wake();
605        }
606    }
607}
608
609impl<T> Drop for Node<T> {
610    fn drop(&mut self) {
611        // Currently a `Node<T>` is sent across all threads for any lifetime,
612        // regardless of `T`. This means that for memory safety we can't
613        // actually touch `T` at any time except when we have a reference to the
614        // `FuturesUnordered` itself.
615        //
616        // Consequently it *should* be the case that we always drop futures from
617        // the `FuturesUnordered` instance, but this is a bomb in place to catch
618        // any bugs in that logic.
619        unsafe {
620            if (*self.future.get()).is_some() {
621                abort("future still here when dropping");
622            }
623        }
624    }
625}
626
627fn abort(s: &str) -> ! {
628    struct DoublePanic;
629
630    impl Drop for DoublePanic {
631        fn drop(&mut self) {
632            panic!("panicking twice to abort the program");
633        }
634    }
635
636    let _bomb = DoublePanic;
637    panic!("{}", s);
638}
639
640/// Converts a list of futures into a `Stream` of results from the futures.
641///
642/// This function will take an list of futures (e.g. a vector, an iterator,
643/// etc), and return a stream. The stream will yield items as they become
644/// available on the futures internally, in the order that they become
645/// available. This function is similar to `buffer_unordered` in that it may
646/// return items in a different order than in the list specified.
647///
648/// Note that the returned set can also be used to dynamically push more
649/// futures into the set as they become available.
650pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
651where
652    I: IntoIterator,
653    I::Item: IntoFuture,
654{
655    futures.into_iter().map(|f| f.into_future()).collect()
656}