RxRs is a lightweight Rx implementation which build upon futures::Stream
.
It aims to provide Subject
s which allow multiple subscribing Stream
s. Events are ref-counted in the downstream(s).
The subjects are:
PublishSubject
BehaviorSubject
ReplaySubject
Then there's combinators
CombineLatest2
..CombineLatest9
Zip2
..Zip9
It also exposes RxExt
, which like StreamExt
provides typical Rx transformers.
The ops so far are:
buffer
debounce
delay
dematerialize
distinct
distinct_until_changed
end_with
materialize
pairwise
race
share
share_behavior
share_replay
start_with
switch_map
throttle
window
with_latest_from
Note that a lot of other Rx operators are already part of the futures::StreamExt
trait. This crate will only ever contain Rx operators that are missing from StreamExt
.
Do use both StreamExt
and RxExt
to access all.
Subject example
let mut subject = new;
subject.next;
subject.next;
subject.next;
subject.close;
let obs = subject.subscribe;
// You can subscribe multiple times
let another_obs = subject.subscribe;
block_on;
CombineLatest example
let s1 = iter;
let s2 = iter;
let s3 = iter;
let stream = new;
block_on;
Operators
// ops are accessible via the RxExt trait and work on futures::Stream
let stream = iter
.start_with // precede the emission with event from an Iter
.with_latest_from // combine latest of Self and another stream
.distinct_until_changed // avoid repeating the same exact event in immediate sequence
.buffer // buffer every 3 events emitted
.debounce
.pairwise // previous and next events side-by-side
.share_behavior; // convert into a broadcast Stream and for every new subscription, start by emitting the last emitted event