futures_bounded/
futures_tuple_set.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::task::{ready, Context, Poll};
4use std::time::Duration;
5
6use futures_util::future::BoxFuture;
7
8use crate::{FuturesMap, PushError, Timeout};
9
10/// Represents a list of tuples of a [Future] and an associated piece of data.
11///
12/// Each future must finish within the specified time and the list never outgrows its capacity.
13pub struct FuturesTupleSet<O, D> {
14    id: u32,
15    inner: FuturesMap<u32, O>,
16    data: HashMap<u32, D>,
17}
18
19impl<O, D> FuturesTupleSet<O, D> {
20    pub fn new(timeout: Duration, capacity: usize) -> Self {
21        Self {
22            id: 0,
23            inner: FuturesMap::new(timeout, capacity),
24            data: HashMap::new(),
25        }
26    }
27}
28
29impl<O, D> FuturesTupleSet<O, D>
30where
31    O: 'static,
32{
33    /// Push a future into the list.
34    ///
35    /// This method adds the given future to the list.
36    /// If the length of the list is equal to the capacity, this method returns a error that contains the passed future.
37    /// In that case, the future is not added to the set.
38    pub fn try_push<F>(&mut self, future: F, data: D) -> Result<(), (BoxFuture<O>, D)>
39    where
40        F: Future<Output = O> + Send + 'static,
41    {
42        self.id = self.id.wrapping_add(1);
43
44        match self.inner.try_push(self.id, future) {
45            Ok(()) => {}
46            Err(PushError::BeyondCapacity(w)) => return Err((w, data)),
47            Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"),
48        }
49        self.data.insert(self.id, data);
50
51        Ok(())
52    }
53
54    pub fn len(&self) -> usize {
55        self.inner.len()
56    }
57
58    pub fn is_empty(&self) -> bool {
59        self.inner.is_empty()
60    }
61
62    pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> {
63        self.inner.poll_ready_unpin(cx)
64    }
65
66    pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(Result<O, Timeout>, D)> {
67        let (id, res) = ready!(self.inner.poll_unpin(cx));
68        let data = self.data.remove(&id).expect("must have data for future");
69
70        Poll::Ready((res, data))
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77    use futures_util::future::poll_fn;
78    use futures_util::FutureExt;
79    use std::future::ready;
80
81    #[test]
82    fn tracks_associated_data_of_future() {
83        let mut set = FuturesTupleSet::new(Duration::from_secs(10), 10);
84
85        let _ = set.try_push(ready(1), 1);
86        let _ = set.try_push(ready(2), 2);
87
88        let (res1, data1) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap();
89        let (res2, data2) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap();
90
91        assert_eq!(res1.unwrap(), data1);
92        assert_eq!(res2.unwrap(), data2);
93    }
94}