pub trait RxExt: Stream {
Show 24 methods
// Provided methods
fn race<S: Stream<Item = Self::Item>>(
self,
other: S,
) -> Race<Self, S, Self::Item>
where Self: Sized { ... }
fn start_with<I: IntoIterator<Item = Self::Item>>(
self,
iter: I,
) -> StartWith<Self>
where Self: Sized { ... }
fn end_with<I: IntoIterator<Item = Self::Item>>(
self,
iter: I,
) -> EndWith<Self>
where Self: Sized { ... }
fn share(self) -> Shared<Self, PublishSubject<Self::Item>>
where Self: Sized { ... }
fn share_behavior(self) -> Shared<Self, BehaviorSubject<Self::Item>>
where Self: Sized { ... }
fn share_replay(self) -> Shared<Self, ReplaySubject<Self::Item>>
where Self: Sized { ... }
fn switch_map<S: Stream, F: FnMut(Self::Item) -> S>(
self,
f: F,
) -> SwitchMap<Self, S, F>
where Self: Sized { ... }
fn pairwise(self) -> Pairwise<Self>
where Self: Sized { ... }
fn debounce<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Debounce<Self, Fut, F>
where Self: Sized { ... }
fn throttle<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Throttle<Self, Fut, F>
where Self: Sized { ... }
fn throttle_trailing<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Throttle<Self, Fut, F>
where Self: Sized { ... }
fn throttle_all<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Throttle<Self, Fut, F>
where Self: Sized { ... }
fn buffer<Fut: Future<Output = bool>, F: FnMut(&Self::Item, usize) -> Fut>(
self,
f: F,
) -> Buffer<Self, Fut, F>
where Self: Sized { ... }
fn window<Fut: Future<Output = bool>, F: FnMut(&Self::Item, usize) -> Fut>(
self,
f: F,
) -> Window<Self, Fut, F>
where Self: Sized { ... }
fn distinct(self) -> Distinct<Self>
where Self: Sized,
Self::Item: Hash { ... }
fn distinct_until_changed(self) -> DistinctUntilChanged<Self>
where Self: Sized,
Self::Item: Hash { ... }
fn materialize(self) -> Materialize<Self>
where Self: Sized { ... }
fn dematerialize<T>(self) -> Dematerialize<Self, T>
where Self: Stream<Item = Notification<T>> + Sized { ... }
fn delay<Fut: Future, F: FnMut() -> Fut>(self, f: F) -> Delay<Self, Fut, F>
where Self: Sized { ... }
fn delay_every<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
max_buffer_size: Option<usize>,
) -> DelayEvery<Self, Fut, F>
where Self: Sized { ... }
fn with_latest_from<S: Stream>(
self,
stream: S,
) -> CombineLatest2<Self, S, Self::Item, S::Item>
where Self: Sized,
Self::Item: ToOwned<Owned = Self::Item>,
S::Item: ToOwned<Owned = S::Item> { ... }
fn timing(self) -> Timing<Self>
where Self: Sized { ... }
fn inspect_done<F: FnMut()>(self, f: F) -> InspectDone<Self, F>
where Self: Sized { ... }
fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
where Self: Sized { ... }
}
Provided Methods§
Sourcefn race<S: Stream<Item = Self::Item>>(
self,
other: S,
) -> Race<Self, S, Self::Item>where
Self: Sized,
fn race<S: Stream<Item = Self::Item>>(
self,
other: S,
) -> Race<Self, S, Self::Item>where
Self: Sized,
Starts polling itself as well as the provided other Stream
.
The first one to emit an event “wins” and proceeds to emit all its next events.
The “loser” is discarded and will not be polled further.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(0..=3);
let slower_stream = stream::iter(4..=6).delay(|| async { /* return delayed over time */ });
let stream = stream.race(slower_stream);
assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
Sourcefn start_with<I: IntoIterator<Item = Self::Item>>(
self,
iter: I,
) -> StartWith<Self>where
Self: Sized,
fn start_with<I: IntoIterator<Item = Self::Item>>(
self,
iter: I,
) -> StartWith<Self>where
Self: Sized,
Precedes all emitted events with the items of an iter.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(4..=6);
let stream = stream.start_with(0..=3);
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6], stream.collect::<Vec<_>>().await);
Sourcefn end_with<I: IntoIterator<Item = Self::Item>>(self, iter: I) -> EndWith<Self>where
Self: Sized,
fn end_with<I: IntoIterator<Item = Self::Item>>(self, iter: I) -> EndWith<Self>where
Self: Sized,
Follows all emitted events with the items of an iter.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(0..=3);
let stream = stream.end_with(4..=6);
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6], stream.collect::<Vec<_>>().await);
Transforms a Stream
into a broadcast one, which can be subscribed to more than once, after cloning the shared version.
Behavior is exactly like a PublishSubject
, every new subscription will produce a unique Stream
which only emits Event
objects.
An Event
is a helper object which wraps a ref counted value.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::{stream::{StreamExt, self}, future::join};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(0..=3);
let stream = stream.share();
let sub_stream_a = stream.clone().map(|event| *event); // an event is Event here, wrapping a ref counted value
let sub_stream_b = stream.clone().map(|event| *event); // which we can just deref in this case to i32
assert_eq!((vec![0, 1, 2, 3], vec![0, 1, 2, 3]), join(sub_stream_a.collect::<Vec<_>>(), sub_stream_b.collect::<Vec<_>>()).await);
Transforms a Stream
into a broadcast one, which can be subscribed to more than once, after cloning the shared version.
Behavior is exactly like a BehaviorSubject
, where every new subscription will always receive the last emitted event
from the parent Stream
first.
Every new subscription will produce a unique Stream
which only emits Event
objects.
An Event
is a helper object which wraps a ref counted value.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::{stream::{StreamExt, self}, future::join};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(0..=3);
let stream = stream.share_behavior();
stream.clone().collect::<Vec<_>>().await; // consume all events beforehand
let sub_stream_a = stream.clone().map(|event| *event); // an event is Event here, wrapping a ref counted value
let sub_stream_b = stream.clone().map(|event| *event); // which we can just deref in this case to i32
assert_eq!(
(vec![3], vec![3]),
join(
sub_stream_a.collect::<Vec<_>>(),
sub_stream_b.collect::<Vec<_>>()
)
.await
);
Transforms a Stream
into a broadcast one, which can be subscribed to more than once, after cloning the shared version.
Behavior is exactly like a ReplaySubject
, where every new subscription will always receive all previously emitted events
from the parent Stream
first.
Every new subscription will produce a unique Stream
which only emits Event
objects.
An Event
is a helper object which wraps a ref counted value.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::{stream::{StreamExt, self}, future::join};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(0..=3);
let stream = stream.share_replay();
stream.clone().collect::<Vec<_>>().await; // consume all events beforehand
let sub_stream_a = stream.clone().map(|event| *event); // an event is Event here, wrapping a ref counted value
let sub_stream_b = stream.clone().map(|event| *event); // which we can just deref in this case to i32
assert_eq!(
(vec![0, 1, 2, 3], vec![0, 1, 2, 3]),
join(
sub_stream_a.collect::<Vec<_>>(),
sub_stream_b.collect::<Vec<_>>()
)
.await
);
Sourcefn switch_map<S: Stream, F: FnMut(Self::Item) -> S>(
self,
f: F,
) -> SwitchMap<Self, S, F>where
Self: Sized,
fn switch_map<S: Stream, F: FnMut(Self::Item) -> S>(
self,
f: F,
) -> SwitchMap<Self, S, F>where
Self: Sized,
Like flat_map
, except that switched Stream
is interrupted when the parent Stream
emits a next event.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(0..=3);
let stream = stream.switch_map(|event| stream::iter([event + 10, event - 10]));
assert_eq!(vec![10, 11, 12, 13, -7], stream.collect::<Vec<_>>().await);
Sourcefn pairwise(self) -> Pairwise<Self>where
Self: Sized,
fn pairwise(self) -> Pairwise<Self>where
Self: Sized,
Emits pairs of the previous and next events as a tuple.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
The next value in the tuple is a value reference, and therefore wrapped inside an Event
struct.
An Event
is a helper object for ref counted events.
As the next event will also need to be emitted as the previous event in the next pair,
it is first made available as next using a ref count - Event
.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(0..=3);
let stream = stream.pairwise();
let stream = stream.map(|(prev, next)| (prev, *next)); // we can deref here to i32
assert_eq!(vec![(0, 1), (1, 2), (2, 3)], stream.collect::<Vec<_>>().await);
Sourcefn debounce<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Debounce<Self, Fut, F>where
Self: Sized,
fn debounce<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Debounce<Self, Fut, F>where
Self: Sized,
Delays events using a debounce time window. The event will emit when this window closes and when no other event was emitted while this window was open.
The provided closure is executed over all elements of this stream as
they are made available. It is executed inline with calls to
poll_next
.
The debounce window resets on every newly emitted event.
On next, the closure is invoked and a reference to the event is passed.
The closure needs to return a Future
, which represents the next debounce window over time.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
Sourcefn throttle<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Throttle<Self, Fut, F>where
Self: Sized,
fn throttle<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Throttle<Self, Fut, F>where
Self: Sized,
Creates a new interval from the closure, whenever a new event is emitted from the parent Stream
.
This event is immediately emitted, however for as long as the interval is now open, no
subsequent events will be emitted.
When the interval closes and the parent Stream
emits a new event, this
process repeats.
The provided closure is executed over all elements of this stream as
they are made available. It is executed inline with calls to
poll_next
.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
See also sample
Sourcefn throttle_trailing<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Throttle<Self, Fut, F>where
Self: Sized,
fn throttle_trailing<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Throttle<Self, Fut, F>where
Self: Sized,
Like throttle
, but only emitting trailing items.
Sourcefn throttle_all<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Throttle<Self, Fut, F>where
Self: Sized,
fn throttle_all<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
) -> Throttle<Self, Fut, F>where
Self: Sized,
Like throttle
, but emitting both leading and trailing items.
Sourcefn buffer<Fut: Future<Output = bool>, F: FnMut(&Self::Item, usize) -> Fut>(
self,
f: F,
) -> Buffer<Self, Fut, F>where
Self: Sized,
fn buffer<Fut: Future<Output = bool>, F: FnMut(&Self::Item, usize) -> Fut>(
self,
f: F,
) -> Buffer<Self, Fut, F>where
Self: Sized,
Creates chunks of buffered data.
The provided closure is executed over all elements of this stream as
they are made available. It is executed inline with calls to
poll_next
.
You can use a reference to the current event, or the count of the current buffer to determine when a chunk should close and emit next.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use std::collections::VecDeque;
use futures::stream::{self, StreamExt};
use futures_rx::RxExt;
let stream = stream::iter(0..9);
let stream = stream.buffer(|_, count| async move { count == 3 });
assert_eq!(
vec![
VecDeque::from_iter([0, 1, 2]),
VecDeque::from_iter([3, 4, 5]),
VecDeque::from_iter([6, 7, 8])
],
stream.collect::<Vec<_>>().await
);
Sourcefn window<Fut: Future<Output = bool>, F: FnMut(&Self::Item, usize) -> Fut>(
self,
f: F,
) -> Window<Self, Fut, F>where
Self: Sized,
fn window<Fut: Future<Output = bool>, F: FnMut(&Self::Item, usize) -> Fut>(
self,
f: F,
) -> Window<Self, Fut, F>where
Self: Sized,
Creates chunks of buffered data as new Stream
s.
The provided closure is executed over all elements of this stream as
they are made available. It is executed inline with calls to
poll_next
.
You can use a reference to the current event, or the count of the current buffer to determine when a chunk should close and emit next.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::RxExt;
let stream = stream::iter(0..9);
let stream = stream.window(|_, count| async move { count == 3 }).flat_map(|it| it);
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7, 8], stream.collect::<Vec<_>>().await);
Sourcefn distinct(self) -> Distinct<Self>
fn distinct(self) -> Distinct<Self>
Ensures that all emitted events are unique.
Events are required to implement Hash
.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::RxExt;
let stream = stream::iter([1, 2, 1, 3, 2, 2, 1, 4]);
let stream = stream.distinct();
assert_eq!(vec![1, 2, 3, 4], stream.collect::<Vec<_>>().await);
Sourcefn distinct_until_changed(self) -> DistinctUntilChanged<Self>
fn distinct_until_changed(self) -> DistinctUntilChanged<Self>
Ensures that all emitted events are unique within immediate sequence.
Events are required to implement Hash
.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::RxExt;
let stream = stream::iter([1, 1, 1, 2, 2, 2, 3, 1, 1]);
let stream = stream.distinct_until_changed();
assert_eq!(vec![1, 2, 3, 1], stream.collect::<Vec<_>>().await);
Sourcefn materialize(self) -> Materialize<Self>where
Self: Sized,
fn materialize(self) -> Materialize<Self>where
Self: Sized,
Converts all events of a Stream
into Notification
events.
When the Stream
is done, it will first emit a final Notification::Complete
event.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(0..=3);
let stream = stream.materialize();
assert_eq!(
vec![
Notification::Next(0),
Notification::Next(1),
Notification::Next(2),
Notification::Next(3),
Notification::Complete
],
stream.collect::<Vec<_>>().await
);
Sourcefn dematerialize<T>(self) -> Dematerialize<Self, T>
fn dematerialize<T>(self) -> Dematerialize<Self, T>
The inverse of materialize.
Use this transformer to translate a Stream
emitting Notification
events back
into a Stream
emitting original events.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::RxExt;
let stream = stream::iter(0..=3);
let stream = stream.materialize().dematerialize();
assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
Sourcefn delay<Fut: Future, F: FnMut() -> Fut>(self, f: F) -> Delay<Self, Fut, F>where
Self: Sized,
fn delay<Fut: Future, F: FnMut() -> Fut>(self, f: F) -> Delay<Self, Fut, F>where
Self: Sized,
Delays emitting events using an initial time window, provided by a closure.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::RxExt;
let stream = stream::iter(0..=3);
let stream = stream.delay(|| async { /* return delayed over time */ });
assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
Sourcefn delay_every<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
max_buffer_size: Option<usize>,
) -> DelayEvery<Self, Fut, F>where
Self: Sized,
fn delay_every<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
self,
f: F,
max_buffer_size: Option<usize>,
) -> DelayEvery<Self, Fut, F>where
Self: Sized,
Delays every event using a time window, provided by a closure.
Use max_buffer_size to limit the amount of buffered items that are awaiting time window(s) to complete.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::RxExt;
let stream = stream::iter(0..=3);
let stream = stream.delay_every(|_| async { /* return delayed over time */ }, None);
assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
Sourcefn with_latest_from<S: Stream>(
self,
stream: S,
) -> CombineLatest2<Self, S, Self::Item, S::Item>
fn with_latest_from<S: Stream>( self, stream: S, ) -> CombineLatest2<Self, S, Self::Item, S::Item>
Acts just like a CombineLatest2
, where every next event is a tuple pair
containing the last emitted events from both Stream
s.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::RxExt;
let stream = stream::iter(0..=3);
let stream = stream.with_latest_from(stream::iter(0..=3));
assert_eq!(vec![(0, 0), (1, 1), (2, 2), (3, 3)], stream.collect::<Vec<_>>().await);
Sourcefn timing(self) -> Timing<Self>where
Self: Sized,
fn timing(self) -> Timing<Self>where
Self: Sized,
Wraps each item into a Timed
struct.
This structs hold the actual event, as well as
a timestamp containing an Instant
and an elapsed interval
as Duration
, relative to the second to last emitted event.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
Sourcefn inspect_done<F: FnMut()>(self, f: F) -> InspectDone<Self, F>where
Self: Sized,
fn inspect_done<F: FnMut()>(self, f: F) -> InspectDone<Self, F>where
Self: Sized,
Similar to inspect
, except that the closure provided is only ever
triggered when the Stream
is done.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
§Examples
use futures::stream::{self, StreamExt};
use futures_rx::RxExt;
let mut is_done = false;
stream::iter(0..=8)
.inspect_done(|| is_done = true)
.collect::<Vec<_>>()
.await;
assert!(is_done);
Sourcefn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>where
Self: Sized,
fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>where
Self: Sized,
Only emits events whenever the sampler
emits an event.
The event emitted is then the last emitted event from the
source Stream
.
If the sampler
triggers before the source Stream
was
able to produce a new event, then no event is emitted.
Note that this function consumes the stream passed into it and returns a wrapped version of it.
See also throttle
.