observable_combine_changes
The CombineChangesObservable subscribes to two input observables, and emits
when either of them emit, even if the other haven’t emitted yet.
It wraps downstream signals into the Change<T> enum, where a value is either:
JustUpdated- When this value’s changed caused the emission.Latest- When this value did not change since the last emission.None- When this observable have not emitted yet.
See Also
- CombineLatestObservable - Subscribes to two different observables, and emit the latest of both both values when either of them emits. It only starts emitting once both have emitted at least once.
- ZipObservable - Subscribes to two different observables, and emit both values when both of them emits, pairing up emissions by the order they happened.
- JoinObservable - Subscribes to two different observables, and emit the latest of both values once both of them had completed!
Example
cargo run -p rx_core --example observable_combine_changes_example
let mut greetings_subject = PublishSubject::<&'static str>::default();
let mut count_subject = PublishSubject::<usize>::default();
let mut subscription = combine_changes(greetings_subject.clone(), count_subject.clone())
.subscribe(PrintObserver::new("combine_changes"));
greetings_subject.next("Hello!");
count_subject.next(10);
count_subject.next(20);
greetings_subject.next("Szia!");
greetings_subject.complete();
count_subject.next(30);
count_subject.complete();
subscription.unsubscribe();
Output:
combine_changes - next: (JustUpdated("Hello!"), None)
combine_changes - next: (Latest("Hello!"), JustUpdated(10))
combine_changes - next: (Latest("Hello!"), JustUpdated(20))
combine_changes - next: (JustUpdated("Szia!"), Latest(20))
combine_changes - next: (Latest("Szia!"), JustUpdated(30))
combine_changes - completed
combine_changes - unsubscribed