futures_concurrency/stream/
stream_ext.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use core::future::IntoFuture;

use crate::stream::{IntoStream, Merge};
use futures_core::Stream;

#[cfg(feature = "alloc")]
use crate::concurrent_stream::FromStream;

use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, WaitUntil, Zip};

/// An extension trait for the `Stream` trait.
pub trait StreamExt: Stream {
    /// Combines two streams into a single stream of all their outputs.
    fn merge<T, S2>(self, other: S2) -> Merge2<T, Self, S2::IntoStream>
    where
        Self: Stream<Item = T> + Sized,
        S2: IntoStream<Item = T>;

    /// Takes two streams and creates a new stream over all in sequence
    fn chain<T, S2>(self, other: S2) -> Chain2<Self, S2::IntoStream>
    where
        Self: Stream<Item = T> + Sized,
        S2: IntoStream<Item = T>;

    /// ‘Zips up’ multiple streams into a single stream of pairs.
    fn zip<T, S2>(self, other: S2) -> Zip2<Self, S2::IntoStream>
    where
        Self: Stream<Item = T> + Sized,
        S2: IntoStream<Item = T>;

    /// Convert into a concurrent stream.
    #[cfg(feature = "alloc")]
    fn co(self) -> FromStream<Self>
    where
        Self: Sized,
    {
        FromStream::new(self)
    }

    /// Delay the yielding of items from the stream until the given deadline.
    ///
    /// The underlying stream will not be polled until the deadline has expired. In addition
    /// to using a time source as a deadline, any future can be used as a
    /// deadline too. When used in combination with a multi-consumer channel,
    /// this method can be used to synchronize the start of multiple streams and futures.
    ///
    /// # Example
    /// ```
    /// # #[cfg(miri)] fn main() {}
    /// # #[cfg(not(miri))]
    /// # fn main() {
    /// use async_io::Timer;
    /// use futures_concurrency::prelude::*;
    /// use futures_lite::{future::block_on, stream};
    /// use futures_lite::prelude::*;
    /// use std::time::{Duration, Instant};
    ///
    /// block_on(async {
    ///     let now = Instant::now();
    ///     let duration = Duration::from_millis(100);
    ///
    ///     stream::once("meow")
    ///         .wait_until(Timer::after(duration))
    ///         .next()
    ///         .await;
    ///
    ///     assert!(now.elapsed() >= duration);
    /// });
    /// # }
    /// ```
    fn wait_until<D>(self, deadline: D) -> WaitUntil<Self, D::IntoFuture>
    where
        Self: Sized,
        D: IntoFuture,
    {
        WaitUntil::new(self, deadline.into_future())
    }
}

impl<S1> StreamExt for S1
where
    S1: Stream,
{
    fn merge<T, S2>(self, other: S2) -> Merge2<T, S1, S2::IntoStream>
    where
        S1: Stream<Item = T>,
        S2: IntoStream<Item = T>,
    {
        Merge::merge((self, other))
    }

    fn chain<T, S2>(self, other: S2) -> Chain2<Self, S2::IntoStream>
    where
        Self: Stream<Item = T> + Sized,
        S2: IntoStream<Item = T>,
    {
        // TODO(yosh): fix the bounds on the tuple impl
        Chain::chain((self, other.into_stream()))
    }

    fn zip<T, S2>(self, other: S2) -> Zip2<Self, S2::IntoStream>
    where
        Self: Stream<Item = T> + Sized,
        S2: IntoStream<Item = T>,
    {
        // TODO(yosh): fix the bounds on the tuple impl
        Zip::zip((self, other.into_stream()))
    }
}