operator_exhaust_map
Map each value to an inner observable and ignore new ones while one is active.
See Also
- ConcatAllOperator - Subscribes to upstream observables one at a time in order.
- MergeAllOperator - Subscribes to upstream observables and merges their emissions concurrently.
- SwitchAllOperator - Switch to the newest inner observable, unsubscribing previous ones.
- ExhaustAllOperator - Ignore new inner observables while one is active.
- ConcatMapOperator - Map each value to an inner observable and subscribe to them one at a time in order.
- MergeMapOperator - Map each value to an inner observable and merge their emissions concurrently.
- SwitchMapOperator - Map each value to an inner observable and switch to the latest, unsubscribing previous ones.
Example
cargo run -p rx_core --example operator_exhaust_map_example
use std::time::Duration;
use rx_core::prelude::*;
use rx_core_testing::MockExecutor;
fn main() {
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();
let mut source = PublishSubject::<i32>::default();
let mut subscription = source
.clone()
.exhaust_map(
move |next| {
println!("Trying to switch to the {}. inner observable..", next);
interval(
IntervalObservableOptions {
duration: Duration::from_millis(1000),
max_emissions_per_tick: 10,
start_on_subscribe: false,
},
scheduler.clone(),
)
.take(3)
},
Never::map_into(),
)
.subscribe(PrintObserver::new("exhaust_map"));
source.next(1);
executor.tick(Duration::from_millis(1000));
executor.tick(Duration::from_millis(1000));
source.next(2); // Ignored while an inner observable is active
executor.tick(Duration::from_millis(1000));
source.next(3); // Switches after the previous inner completes
source.next(4); // Ignored because the new inner just started
executor.tick(Duration::from_millis(1000));
executor.tick(Duration::from_millis(1000));
source.complete();
executor.tick(Duration::from_millis(1000));
source.unsubscribe();
println!("end");
subscription.unsubscribe();
}
Output:
Trying to switch to the 1. inner observable..
Ticking... (1s)
exhaust_map - next: 0
Ticking... (1s)
exhaust_map - next: 1
Trying to switch to the 2. inner observable..
Ticking... (1s)
exhaust_map - next: 2
Trying to switch to the 3. inner observable..
Trying to switch to the 4. inner observable..
Ticking... (1s)
exhaust_map - next: 0
Ticking... (1s)
exhaust_map - next: 1
Ticking... (1s)
exhaust_map - next: 2
exhaust_map - completed
exhaust_map - unsubscribed
end