futures_util/stream/futures_unordered.rs
1//! An unbounded set of futures.
2
3use std::cell::UnsafeCell;
4use std::fmt::{self, Debug};
5use std::iter::FromIterator;
6use std::marker::PhantomData;
7use std::mem;
8use std::ptr;
9use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
10use std::sync::atomic::{AtomicPtr, AtomicBool};
11use std::sync::{Arc, Weak};
12use std::usize;
13
14use futures_core::{Stream, Future, Poll, Async, IntoFuture};
15use futures_core::task::{self, AtomicWaker, UnsafeWake, Waker};
16
17/// A set of `Future`s which may complete in any order.
18///
19/// This structure is optimized to manage a large number of futures.
20/// Futures managed by `FuturesUnordered` will only be polled when they
21/// generate notifications. This reduces the required amount of work needed to
22/// poll large numbers of futures.
23///
24/// `FuturesUnordered` can be filled by `collect`ing an iterator of `Future`s
25/// into a `FuturesUnordered`, or by `push`ing `Future`s onto an existing
26/// `FuturesUnordered`. When new `Future`s are added, `poll_next` must be
27/// called in order to begin receiving wakeups for new `Future`s.
28///
29/// Note that you can create a ready-made `FuturesUnordered` via the
30/// `futures_unordered` function in the `stream` module, or you can start with an
31/// empty set with the `FuturesUnordered::new` constructor.
32#[must_use = "streams do nothing unless polled"]
33pub struct FuturesUnordered<F> {
34 inner: Arc<Inner<F>>,
35 len: usize,
36 head_all: *const Node<F>,
37}
38
39unsafe impl<T: Send> Send for FuturesUnordered<T> {}
40unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}
41
42// FuturesUnordered is implemented using two linked lists. One which links all
43// futures managed by a `FuturesUnordered` and one that tracks futures that have
44// been scheduled for polling. The first linked list is not thread safe and is
45// only accessed by the thread that owns the `FuturesUnordered` value. The
46// second linked list is an implementation of the intrusive MPSC queue algorithm
47// described by 1024cores.net.
48//
49// When a future is submitted to the set a node is allocated and inserted in
50// both linked lists. The next call to `poll` will (eventually) see this node
51// and call `poll` on the future.
52//
53// Before a managed future is polled, the current task's `Wake` is replaced
54// with one that is aware of the specific future being run. This ensures that
55// task notifications generated by that specific future are visible to
56// `FuturesUnordered`. When a notification is received, the node is scheduled
57// for polling by being inserted into the concurrent linked list.
58//
59// Each node uses an `AtomicUsize` to track it's state. The node state is the
60// reference count (the number of outstanding handles to the node) as well as a
61// flag tracking if the node is currently inserted in the atomic queue. When the
62// future is notified, it will only insert itself into the linked list if it
63// isn't currently inserted.
64
65struct Inner<T> {
66 // The task using `FuturesUnordered`.
67 parent: AtomicWaker,
68
69 // Head/tail of the readiness queue
70 head_readiness: AtomicPtr<Node<T>>,
71 tail_readiness: UnsafeCell<*const Node<T>>,
72 stub: Arc<Node<T>>,
73}
74
75struct Node<T> {
76 // The future
77 future: UnsafeCell<Option<T>>,
78
79 // Next pointer for linked list tracking all active nodes
80 next_all: UnsafeCell<*const Node<T>>,
81
82 // Previous node in linked list tracking all active nodes
83 prev_all: UnsafeCell<*const Node<T>>,
84
85 // Next pointer in readiness queue
86 next_readiness: AtomicPtr<Node<T>>,
87
88 // Queue that we'll be enqueued to when notified
89 queue: Weak<Inner<T>>,
90
91 // Whether or not this node is currently in the mpsc queue.
92 queued: AtomicBool,
93}
94
95enum Dequeue<T> {
96 Data(*const Node<T>),
97 Empty,
98 Inconsistent,
99}
100
101impl<T> FuturesUnordered<T>
102 where T: Future,
103{
104 /// Constructs a new, empty `FuturesUnordered`
105 ///
106 /// The returned `FuturesUnordered` does not contain any futures and, in this
107 /// state, `FuturesUnordered::poll_next` will return `Ok(Async::Ready(None))`.
108 pub fn new() -> FuturesUnordered<T> {
109 let stub = Arc::new(Node {
110 future: UnsafeCell::new(None),
111 next_all: UnsafeCell::new(ptr::null()),
112 prev_all: UnsafeCell::new(ptr::null()),
113 next_readiness: AtomicPtr::new(ptr::null_mut()),
114 queued: AtomicBool::new(true),
115 queue: Weak::new(),
116 });
117 let stub_ptr = &*stub as *const Node<T>;
118 let inner = Arc::new(Inner {
119 parent: AtomicWaker::new(),
120 head_readiness: AtomicPtr::new(stub_ptr as *mut _),
121 tail_readiness: UnsafeCell::new(stub_ptr),
122 stub: stub,
123 });
124
125 FuturesUnordered {
126 len: 0,
127 head_all: ptr::null_mut(),
128 inner: inner,
129 }
130 }
131}
132
133impl<T> FuturesUnordered<T> {
134 /// Returns the number of futures contained in the set.
135 ///
136 /// This represents the total number of in-flight futures.
137 pub fn len(&self) -> usize {
138 self.len
139 }
140
141 /// Returns `true` if the set contains no futures
142 pub fn is_empty(&self) -> bool {
143 self.len == 0
144 }
145
146 /// Push a future into the set.
147 ///
148 /// This function submits the given future to the set for managing. This
149 /// function will not call `poll` on the submitted future. The caller must
150 /// ensure that `FuturesUnordered::poll_next` is called in order to receive task
151 /// notifications.
152 pub fn push(&mut self, future: T) {
153 let node = Arc::new(Node {
154 future: UnsafeCell::new(Some(future)),
155 next_all: UnsafeCell::new(ptr::null_mut()),
156 prev_all: UnsafeCell::new(ptr::null_mut()),
157 next_readiness: AtomicPtr::new(ptr::null_mut()),
158 queued: AtomicBool::new(true),
159 queue: Arc::downgrade(&self.inner),
160 });
161
162 // Right now our node has a strong reference count of 1. We transfer
163 // ownership of this reference count to our internal linked list
164 // and we'll reclaim ownership through the `unlink` function below.
165 let ptr = self.link(node);
166
167 // We'll need to get the future "into the system" to start tracking it,
168 // e.g. getting its unpark notifications going to us tracking which
169 // futures are ready. To do that we unconditionally enqueue it for
170 // polling here.
171 self.inner.enqueue(ptr);
172 }
173
174 /// Returns an iterator that allows modifying each future in the set.
175 pub fn iter_mut(&mut self) -> IterMut<T> {
176 IterMut {
177 node: self.head_all,
178 len: self.len,
179 _marker: PhantomData
180 }
181 }
182
183 fn release_node(&mut self, node: Arc<Node<T>>) {
184 // The future is done, try to reset the queued flag. This will prevent
185 // `notify` from doing any work in the future
186 let prev = node.queued.swap(true, SeqCst);
187
188 // Drop the future, even if it hasn't finished yet. This is safe
189 // because we're dropping the future on the thread that owns
190 // `FuturesUnordered`, which correctly tracks T's lifetimes and such.
191 unsafe {
192 drop((*node.future.get()).take());
193 }
194
195 // If the queued flag was previously set then it means that this node
196 // is still in our internal mpsc queue. We then transfer ownership
197 // of our reference count to the mpsc queue, and it'll come along and
198 // free it later, noticing that the future is `None`.
199 //
200 // If, however, the queued flag was *not* set then we're safe to
201 // release our reference count on the internal node. The queued flag
202 // was set above so all future `enqueue` operations will not actually
203 // enqueue the node, so our node will never see the mpsc queue again.
204 // The node itself will be deallocated once all reference counts have
205 // been dropped by the various owning tasks elsewhere.
206 if prev {
207 mem::forget(node);
208 }
209 }
210
211 /// Insert a new node into the internal linked list.
212 fn link(&mut self, node: Arc<Node<T>>) -> *const Node<T> {
213 let ptr = Arc::into_raw(node);
214 unsafe {
215 *(*ptr).next_all.get() = self.head_all;
216 if !self.head_all.is_null() {
217 *(*self.head_all).prev_all.get() = ptr;
218 }
219 }
220
221 self.head_all = ptr;
222 self.len += 1;
223 ptr
224 }
225
226 /// Remove the node from the linked list tracking all nodes currently
227 /// managed by `FuturesUnordered`.
228 unsafe fn unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>> {
229 let node = Arc::from_raw(node);
230 let next = *node.next_all.get();
231 let prev = *node.prev_all.get();
232 *node.next_all.get() = ptr::null_mut();
233 *node.prev_all.get() = ptr::null_mut();
234
235 if !next.is_null() {
236 *(*next).prev_all.get() = prev;
237 }
238
239 if !prev.is_null() {
240 *(*prev).next_all.get() = next;
241 } else {
242 self.head_all = next;
243 }
244 self.len -= 1;
245 node
246 }
247}
248
249impl<T> Stream for FuturesUnordered<T>
250 where T: Future
251{
252 type Item = T::Item;
253 type Error = T::Error;
254
255 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<T::Item>, T::Error> {
256 // Ensure `parent` is correctly set.
257 self.inner.parent.register(cx.waker());
258
259 loop {
260 let node = match unsafe { self.inner.dequeue() } {
261 Dequeue::Empty => {
262 if self.is_empty() {
263 return Ok(Async::Ready(None));
264 } else {
265 return Ok(Async::Pending)
266 }
267 }
268 Dequeue::Inconsistent => {
269 // At this point, it may be worth yielding the thread &
270 // spinning a few times... but for now, just yield using the
271 // task system.
272 cx.waker().wake();
273 return Ok(Async::Pending);
274 }
275 Dequeue::Data(node) => node,
276 };
277
278 debug_assert!(node != self.inner.stub());
279
280 unsafe {
281 let mut future = match (*(*node).future.get()).take() {
282 Some(future) => future,
283
284 // If the future has already gone away then we're just
285 // cleaning out this node. See the comment in
286 // `release_node` for more information, but we're basically
287 // just taking ownership of our reference count here.
288 None => {
289 let node = Arc::from_raw(node);
290 assert!((*node.next_all.get()).is_null());
291 assert!((*node.prev_all.get()).is_null());
292 continue
293 }
294 };
295
296 // Unset queued flag... this must be done before
297 // polling. This ensures that the future gets
298 // rescheduled if it is notified **during** a call
299 // to `poll`.
300 let prev = (*node).queued.swap(false, SeqCst);
301 assert!(prev);
302
303 // We're going to need to be very careful if the `poll`
304 // function below panics. We need to (a) not leak memory and
305 // (b) ensure that we still don't have any use-after-frees. To
306 // manage this we do a few things:
307 //
308 // * This "bomb" here will call `release_node` if dropped
309 // abnormally. That way we'll be sure the memory management
310 // of the `node` is managed correctly.
311 // * The future was extracted above (taken ownership). That way
312 // if it panics we're guaranteed that the future is
313 // dropped on this thread and doesn't accidentally get
314 // dropped on a different thread (bad).
315 // * We unlink the node from our internal queue to preemptively
316 // assume it'll panic, in which case we'll want to discard it
317 // regardless.
318 struct Bomb<'a, T: 'a> {
319 queue: &'a mut FuturesUnordered<T>,
320 node: Option<Arc<Node<T>>>,
321 }
322 impl<'a, T> Drop for Bomb<'a, T> {
323 fn drop(&mut self) {
324 if let Some(node) = self.node.take() {
325 self.queue.release_node(node);
326 }
327 }
328 }
329 let mut bomb = Bomb {
330 node: Some(self.unlink(node)),
331 queue: self,
332 };
333
334 // Poll the underlying future with the appropriate `notify`
335 // implementation. This is where a large bit of the unsafety
336 // starts to stem from internally. The `notify` instance itself
337 // is basically just our `Arc<Node<T>>` and tracks the mpsc
338 // queue of ready futures.
339 //
340 // Critically though `Node<T>` won't actually access `T`, the
341 // future, while it's floating around inside of `Task`
342 // instances. These structs will basically just use `T` to size
343 // the internal allocation, appropriately accessing fields and
344 // deallocating the node if need be.
345 let res = {
346 let notify = NodeToHandle(bomb.node.as_ref().unwrap());
347 let waker = Waker::from(notify);
348 let mut cx = cx.with_waker(&waker);
349 future.poll(&mut cx)
350 };
351
352 let ret = match res {
353 Ok(Async::Pending) => {
354 let node = bomb.node.take().unwrap();
355 *node.future.get() = Some(future);
356 bomb.queue.link(node);
357 continue
358 }
359 Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
360 Err(e) => Err(e),
361 };
362 return ret
363 }
364 }
365 }
366}
367
368impl<T: Debug> Debug for FuturesUnordered<T> {
369 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
370 write!(fmt, "FuturesUnordered {{ ... }}")
371 }
372}
373
374impl<T> Drop for FuturesUnordered<T> {
375 fn drop(&mut self) {
376 // When a `FuturesUnordered` is dropped we want to drop all futures associated
377 // with it. At the same time though there may be tons of `Task` handles
378 // flying around which contain `Node<T>` references inside them. We'll
379 // let those naturally get deallocated when the `Task` itself goes out
380 // of scope or gets notified.
381 unsafe {
382 while !self.head_all.is_null() {
383 let head = self.head_all;
384 let node = self.unlink(head);
385 self.release_node(node);
386 }
387 }
388
389 // Note that at this point we could still have a bunch of nodes in the
390 // mpsc queue. None of those nodes, however, have futures associated
391 // with them so they're safe to destroy on any thread. At this point
392 // the `FuturesUnordered` struct, the owner of the one strong reference
393 // to `Inner<T>` will drop the strong reference. At that point
394 // whichever thread releases the strong refcount last (be it this
395 // thread or some other thread as part of an `upgrade`) will clear out
396 // the mpsc queue and free all remaining nodes.
397 //
398 // While that freeing operation isn't guaranteed to happen here, it's
399 // guaranteed to happen "promptly" as no more "blocking work" will
400 // happen while there's a strong refcount held.
401 }
402}
403
404impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
405 fn from_iter<T>(iter: T) -> Self
406 where
407 T: IntoIterator<Item = F>,
408 {
409 let acc = FuturesUnordered::new();
410 iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc })
411 }
412}
413
414#[derive(Debug)]
415/// Mutable iterator over all futures in the unordered set.
416pub struct IterMut<'a, F: 'a> {
417 node: *const Node<F>,
418 len: usize,
419 _marker: PhantomData<&'a mut FuturesUnordered<F>>
420}
421
422impl<'a, F> Iterator for IterMut<'a, F> {
423 type Item = &'a mut F;
424
425 fn next(&mut self) -> Option<&'a mut F> {
426 if self.node.is_null() {
427 return None;
428 }
429 unsafe {
430 let future = (*(*self.node).future.get()).as_mut().unwrap();
431 let next = *(*self.node).next_all.get();
432 self.node = next;
433 self.len -= 1;
434 Some(future)
435 }
436 }
437
438 fn size_hint(&self) -> (usize, Option<usize>) {
439 (self.len, Some(self.len))
440 }
441}
442
443impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
444
445impl<T> Inner<T> {
446 /// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
447 fn enqueue(&self, node: *const Node<T>) {
448 unsafe {
449 debug_assert!((*node).queued.load(Relaxed));
450
451 // This action does not require any coordination
452 (*node).next_readiness.store(ptr::null_mut(), Relaxed);
453
454 // Note that these atomic orderings come from 1024cores
455 let node = node as *mut _;
456 let prev = self.head_readiness.swap(node, AcqRel);
457 (*prev).next_readiness.store(node, Release);
458 }
459 }
460
461 /// The dequeue function from the 1024cores intrusive MPSC queue algorithm
462 ///
463 /// Note that this unsafe as it required mutual exclusion (only one thread
464 /// can call this) to be guaranteed elsewhere.
465 unsafe fn dequeue(&self) -> Dequeue<T> {
466 let mut tail = *self.tail_readiness.get();
467 let mut next = (*tail).next_readiness.load(Acquire);
468
469 if tail == self.stub() {
470 if next.is_null() {
471 return Dequeue::Empty;
472 }
473
474 *self.tail_readiness.get() = next;
475 tail = next;
476 next = (*next).next_readiness.load(Acquire);
477 }
478
479 if !next.is_null() {
480 *self.tail_readiness.get() = next;
481 debug_assert!(tail != self.stub());
482 return Dequeue::Data(tail);
483 }
484
485 if self.head_readiness.load(Acquire) as *const _ != tail {
486 return Dequeue::Inconsistent;
487 }
488
489 self.enqueue(self.stub());
490
491 next = (*tail).next_readiness.load(Acquire);
492
493 if !next.is_null() {
494 *self.tail_readiness.get() = next;
495 return Dequeue::Data(tail);
496 }
497
498 Dequeue::Inconsistent
499 }
500
501 fn stub(&self) -> *const Node<T> {
502 &*self.stub
503 }
504}
505
506impl<T> Drop for Inner<T> {
507 fn drop(&mut self) {
508 // Once we're in the destructor for `Inner<T>` we need to clear out the
509 // mpsc queue of nodes if there's anything left in there.
510 //
511 // Note that each node has a strong reference count associated with it
512 // which is owned by the mpsc queue. All nodes should have had their
513 // futures dropped already by the `FuturesUnordered` destructor above,
514 // so we're just pulling out nodes and dropping their refcounts.
515 unsafe {
516 loop {
517 match self.dequeue() {
518 Dequeue::Empty => break,
519 Dequeue::Inconsistent => abort("inconsistent in drop"),
520 Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
521 }
522 }
523 }
524 }
525}
526
527#[allow(missing_debug_implementations)]
528struct NodeToHandle<'a, T: 'a>(&'a Arc<Node<T>>);
529
530impl<'a, T> Clone for NodeToHandle<'a, T> {
531 fn clone(&self) -> Self {
532 NodeToHandle(self.0)
533 }
534}
535
536#[doc(hidden)]
537impl<'a, T> From<NodeToHandle<'a, T>> for Waker {
538 fn from(handle: NodeToHandle<'a, T>) -> Waker {
539 unsafe {
540 let ptr = handle.0.clone();
541 let ptr = mem::transmute::<Arc<Node<T>>, *const ArcNode<T>>(ptr);
542 Waker::new(hide_lt(ptr))
543 }
544 }
545}
546
547struct ArcNode<T>(PhantomData<T>);
548
549// We should never touch `T` on any thread other than the one owning
550// `FuturesUnordered`, so this should be a safe operation.
551unsafe impl<T> Send for ArcNode<T> {}
552unsafe impl<T> Sync for ArcNode<T> {}
553
554unsafe impl<T> UnsafeWake for ArcNode<T> {
555 unsafe fn clone_raw(&self) -> Waker {
556 let me: *const ArcNode<T> = self;
557 let me: *const *const ArcNode<T> = &me;
558 let me = &*(me as *const Arc<Node<T>>);
559 NodeToHandle(me).into()
560 }
561
562 unsafe fn drop_raw(&self) {
563 let mut me: *const ArcNode<T> = self;
564 let me = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>;
565 ptr::drop_in_place(me);
566 }
567
568 unsafe fn wake(&self) {
569 let me: *const ArcNode<T> = self;
570 let me: *const *const ArcNode<T> = &me;
571 let me = me as *const Arc<Node<T>>;
572 Node::notify(&*me)
573 }
574}
575
576unsafe fn hide_lt<T>(p: *const ArcNode<T>) -> *const UnsafeWake {
577 mem::transmute(p as *const UnsafeWake)
578}
579
580impl<T> Node<T> {
581 fn notify(me: &Arc<Node<T>>) {
582 let inner = match me.queue.upgrade() {
583 Some(inner) => inner,
584 None => return,
585 };
586
587 // It's our job to notify the node that it's ready to get polled,
588 // meaning that we need to enqueue it into the readiness queue. To
589 // do this we flag that we're ready to be queued, and if successful
590 // we then do the literal queueing operation, ensuring that we're
591 // only queued once.
592 //
593 // Once the node is inserted we be sure to notify the parent task,
594 // as it'll want to come along and pick up our node now.
595 //
596 // Note that we don't change the reference count of the node here,
597 // we're just enqueueing the raw pointer. The `FuturesUnordered`
598 // implementation guarantees that if we set the `queued` flag true that
599 // there's a reference count held by the main `FuturesUnordered` queue
600 // still.
601 let prev = me.queued.swap(true, SeqCst);
602 if !prev {
603 inner.enqueue(&**me);
604 inner.parent.wake();
605 }
606 }
607}
608
609impl<T> Drop for Node<T> {
610 fn drop(&mut self) {
611 // Currently a `Node<T>` is sent across all threads for any lifetime,
612 // regardless of `T`. This means that for memory safety we can't
613 // actually touch `T` at any time except when we have a reference to the
614 // `FuturesUnordered` itself.
615 //
616 // Consequently it *should* be the case that we always drop futures from
617 // the `FuturesUnordered` instance, but this is a bomb in place to catch
618 // any bugs in that logic.
619 unsafe {
620 if (*self.future.get()).is_some() {
621 abort("future still here when dropping");
622 }
623 }
624 }
625}
626
627fn abort(s: &str) -> ! {
628 struct DoublePanic;
629
630 impl Drop for DoublePanic {
631 fn drop(&mut self) {
632 panic!("panicking twice to abort the program");
633 }
634 }
635
636 let _bomb = DoublePanic;
637 panic!("{}", s);
638}
639
640/// Converts a list of futures into a `Stream` of results from the futures.
641///
642/// This function will take an list of futures (e.g. a vector, an iterator,
643/// etc), and return a stream. The stream will yield items as they become
644/// available on the futures internally, in the order that they become
645/// available. This function is similar to `buffer_unordered` in that it may
646/// return items in a different order than in the list specified.
647///
648/// Note that the returned set can also be used to dynamically push more
649/// futures into the set as they become available.
650pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
651where
652 I: IntoIterator,
653 I::Item: IntoFuture,
654{
655 futures.into_iter().map(|f| f.into_future()).collect()
656}