futures_bounded/
stream_set.rs

1use futures_util::stream::BoxStream;
2use futures_util::Stream;
3use std::task::{ready, Context, Poll};
4use std::time::Duration;
5
6use crate::{PushError, StreamMap, Timeout};
7
8/// Represents a set of [Stream]s.
9///
10/// Each stream must finish within the specified time and the list never outgrows its capacity.
11pub struct StreamSet<O> {
12    id: u32,
13    inner: StreamMap<u32, O>,
14}
15
16impl<O> StreamSet<O> {
17    pub fn new(timeout: Duration, capacity: usize) -> Self {
18        Self {
19            id: 0,
20            inner: StreamMap::new(timeout, capacity),
21        }
22    }
23}
24
25impl<O> StreamSet<O>
26where
27    O: Send + 'static,
28{
29    /// Push a stream into the list.
30    ///
31    /// This method adds the given stream to the list.
32    /// If the length of the list is equal to the capacity, this method returns a error that contains the passed stream.
33    /// In that case, the stream is not added to the set.
34    pub fn try_push<F>(&mut self, stream: F) -> Result<(), BoxStream<O>>
35    where
36        F: Stream<Item = O> + Send + 'static,
37    {
38        self.id = self.id.wrapping_add(1);
39
40        match self.inner.try_push(self.id, stream) {
41            Ok(()) => Ok(()),
42            Err(PushError::BeyondCapacity(w)) => Err(w),
43            Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"),
44        }
45    }
46
47    pub fn len(&self) -> usize {
48        self.inner.len()
49    }
50
51    pub fn is_empty(&self) -> bool {
52        self.inner.is_empty()
53    }
54
55    pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> {
56        self.inner.poll_ready_unpin(cx)
57    }
58
59    pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<O, Timeout>>> {
60        let (_, res) = ready!(self.inner.poll_next_unpin(cx));
61
62        Poll::Ready(res)
63    }
64}