Introduction
Reactive Extensions for Bevy!
rx_bevy brings observables and composable operators to make event
orchestration a breeze!
Use Observables to serve as event sources, and subscribe to them to spawn active subscriptions that send events to Observers!
Use combination observables to combine other event sources, and use operators to transform these events and their behavior!
rx_core / rx_bevy
While this project was built with the purpose of bringing reactive extensions to
Bevy specifically, its core was designed to be framework agnostic. And as such
all generic observables, operators, traits are shipped from the rx_core crate
and are prefixed with rx_core. rx_bevy re-exports the entirety of rx_core
and some more, Bevy specific observables and operators with the context
implementation necessary to integrate with bevy_ecs.
This does not mean that
rx_bevyis not a Bevy first project, as it suffers no downsides from keeping its core framework agnostic. In fact, many core trait design decisions were made to facilitate easy integration with ECS.
Book Contents
To learn the core concepts used throughout this project like what an Observable is, take a look at the Concepts page.
Observables, Operators and Subjects
Learn more about individual low level component from their readme file, accessible right here from the book.
Every construct implemented follows the traditional Rx names, and behavior. So if you’ve used an Rx library before, even in another language, your knowledge applies here too.
rx_bevy specifics
Once you know the core concepts and what each component offers, learn how to use them within Bevy at the Usage Within Bevy page.
Examples
Toggle a timer’s speed by keyboard
In this example, the KeyboardObservables subscription emits just_pressed
KeyCodes, and the filter operator limits them to 4. Then switch_map creates
an internal subscription to an IntervalObservable whose speed depends on the
KeyCode observed. The scan operator ignores the interval’s emissions (they
restart on every new KeyCode) and counts the number of emissions. The result is
a counter whose speed changes based on the key pressed.
Try this example in the observable gallery! Press L to subscribe/unsubscribe!
cargo run --example observable_gallery --features example
let interval_observable = commands
.spawn((
Name::new("IntervalObservable"),
KeyboardObservable::default()
.filter(|key_code| {
matches!(
key_code,
KeyCode::Digit1 | KeyCode::Digit2 | KeyCode::Digit3 | KeyCode::Digit4
)
})
.switch_map(|key_code| {
let duration = match key_code {
KeyCode::Digit1 => Duration::from_millis(5),
KeyCode::Digit2 => Duration::from_millis(100),
KeyCode::Digit3 => Duration::from_millis(500),
KeyCode::Digit4 => Duration::from_millis(2000),
_ => unreachable!(),
};
IntervalObservable::new(IntervalObservableOptions {
duration,
start_on_subscribe: false,
max_emissions_per_tick: 4,
})
})
.scan(|acc, _next| acc + 1, 0 as usize)
.into_component(),
))
.id();
let example_observer = commands
.spawn(Name::new("ExampleObserver"))
.observe(print_next_observer::<usize>)
.id();
let subscription_entity = commands.subscribe::<usize, (), Update>(
interval_observable,
example_observer,
);
Concepts & Terminology
Before diving into the individual observables and operators, let’s go through all the concepts and nomenclature you might encounter, and their definitions.
Pretty much everything within the repository assumes that you already know what each of these mean and do to some degree.
Concept Hierarchy
Observer (Destination)
The simplest concept and the one that needs an immediate clarification is the
observer as this - in the context of rx_bevy - is not the same thing as
Bevy observers!
An RxObserver is something that implements three functions for its 3 observer “channels” via the RxObserver trait:
nexterrorcomplete
Channels & Signals
Functions on the Observer trait can be thought of as channels, carriers of signals, each with a different purpose and behavior:
-
The
nextchannel carries the value signal, the useful content you want to observe. -
The
errorchannel carries errors separately, to enable error handling independently of the values.If you’re curious about why errors are on a separate channel instead of just using
Results, see the “Why errors have their own channel?” section. -
The
completechannel carries the completion signal. It signals that no morenextorerroremissions will come anymore. This signal does not carry any tangible values. And is usually sent right after the lastnextsignal.
Emission
The act of emitting a signal.
Inputs
Observers are things that can receive values, therefore it defines its
input types using the ObserverInput trait. These types define the values
that are received by the next and error functions.
pub trait ObserverInput {
type In: Signal;
type InError: Signal;
}
Notifications
In some places you may encounter signals referred to as notifications. The distinction is that notifications are the materialized form of signals.
This is useful whenever you want to materialize all the different kinds of signals of something into one value, whichever signal that may have been. For example when sending them as an event, or serializing them.
This could be an enum like the ObserverNotification
pub enum ObserverNotification<In, InError>
where
In: Signal,
InError: Signal,
{
Next(In),
Error(InError),
Complete,
}
Or as an event used in Bevy like the SubscriberNotificationEvent
Observable
You may already think of Observables as things that emit signals, but that’s not actually (strictly) true!
Observables are things that you can subscribe to with an observer as the destination of the resulting subscription! This resulting subscription is the source of signals!
Therefore, an observable is more like a piece of configuration based on which actual subscriptions can be created.
Some observables emit their values immediately and they only return an already closed “inert” subscription. For them, technically speaking, it was the observer that emitted those signals. For example the
ofand theiteratorobservables both complete immediately on subscription.
This may seem like a superficial distinction to make as it still is the observable that you directly interact with, but it is important to understand how they work.
If we know that the state is always part of a subscription and not the observable, it’s clear that you can subscribe to the same observable multiple times, and all subscriptions are going to be unique, independent “instances”, with their own state!
Outputs
Observables define the types of what their subscriptions may emit, what errors (if any) they may produce:
pub trait ObservableOutput {
type Out: Signal;
type OutError: Signal;
}
Observables that do not emit errors (or values) use the
Nevertype. SinceNeveris an empty enum, it is impossible to create a value of it! This ensures that if something says it won’t error, then it really can’t. (TheNevertype is actually an alias to theInfallibletype used with theResulttype!)
Example: Subscribing to an Observable
This example demonstrates a subscription of an observable using the
PrintObserver as the destination. Each value will be emitted immediately, but
one by one, and then complete.
let iterator_observable = IteratorObservable::new(1..=4);
let subscription = iterator_observable
.subscribe(PrintObserver::new("iterator_observable"));
Output:
iterator_observable - next: 1
iterator_observable - next: 2
iterator_observable - next: 3
iterator_observable - next: 4
iterator_observable - completed
iterator_observable - unsubscribed
Subscription
From the observables perspective, a subscription is an “instance” of an
observable. The most important function on it is the unsubscribe function,
which will stop the subscription, closing it.
From the subscriptions own perspective, it’s a value that represents owned
resources (state, teardown functions) that you can release by calling
unsubscribe.
The
addandadd_teardownmethods on subscriptions let you add additional things into the subscription that will also be unsubscribed when the subscription unsubscribes. These can be other subscriptions, or just simple callbacks, aka teardowns.
A very important thing to learn here that everything else, observables, operators, observers, are all there just to create subscriptions. This is where state resides!
Teardown
A teardown function is an FnOnce that can be part of a subscription and will
be called on unsubscribe.
Combination Observables
Some observables are there to combine other observables into one. As each input observable can emit as many signals as they want, at their own pace, or maybe even never, there are many ways to combine two observables.
Some examples are:
-
MergeObservable: A tuple of observables of common output types, that emit their values concurrently into a common stream of events. -
ConcatObservable: A tuple of observables of common output types, that subscribes to one observable at a time, waits until it completes and then subscribes to the next one, in order. (Has the exact same behavior as a MergeObservable with aconcurrency_limitof1!) -
CombineLatestObservable: Two observables emit into a tuple of each observable output type ((O1::Out, O2::Out)) when any of them emit, but only after each had at least one emission, aka primed. -
ZipObservable: Two observables emit into a tuple of each observables output type ((O1::Out, O2::Out)) when, for each emission, there is one from the other observable. The first emission ofO1will always be paired with the first emission ofO2, the second emissions will be emitted together and so on.This can lead to the excessive build up of events when one is emitting fast and the other one is slow. The buffering behavior can be controlled by its options value.
Currently only 2 observables can be combined by each combinator. If you want more, just nest more of them together. (Or help implement varargs.)
Primed
Some observables do not emit anything until they are primed. For every subscription, this can happen at most once, and remains true for the remainder of it’s duration.
For example, some combination observables like combine_latest and zip
emit values taken from all of their input observables, so it’s impossible
for them to emit anything until this condition is met. Once it is met, the
subscription to them can be considered primed, and expected to emit values.
Where priming matters is completion. If an upstream completion prevents priming, downstream should immediately complete too. Once primed, the condition to complete will depend on the observable.
Operators
Operators take an observable as input and return a new observable as output, enhancing the original observable with new behavior.
Composable Operators
Composable Operators are a subset of regular Operators. Unlike - for
example - the retry operator, that (as the name suggests) retries
subscription to the source, many other operators do not interact with their
source observable beyond just subscribing to them once.
All composable operators do is either, or both:
-
Wrap the destination into a subscriber on subscribe
-
Interact with the destination on subscribe
The
start_withandfinalizeoperators don’t create anything new on subscribe, they only interact with the destination subscriber.
But they don’t know anything about who the source observable is.
Why though?
This enables 2 things:
-
Simpler implementation for a lot of operators!
By skipping implementing the actual operator that stores the source observable it wraps. This layer is auto implemented using the
Pipeoperator/observable whose sole job is to combine a source observable and a composable operator. -
Enables composite operators (behind the
composefeature)!Composite operators are (composable) operators made from other composable operators!
let my_operator = compose_operator::<i32, Never>() .map(|i| i * 2) .filter(|i| i < &4);Composite Operators are a convenient way of creating new operator without actually having to implement one from scratch. The obvious limitation here is that it can only use the composable subset of operators. So no
retry, noshare.
Subscribers
A subscriber is something that’s both an observer and a subscription at the same time!
Most of the time, they wrap another observer or subscriber, which means you can have a deeply nested series of subscribers, in which the deepest element is usually a regular observer, the true destination. And this whole nested structure lives in a subscription!
A single subscriber usually implements a single, easily understandable behavior, that it applies by reacting to upstream emissions, and producing different downstream emissions.
Downstream & Upstream from the Subscribers Perspective
In the context of observables and operators, downstream refers to the
destination, where signals are sent, and upstream refers to the source,
the caller of the next, error and complete functions.
For example, looking at the map operators next implementation:
fn next(
&mut self,
next: Self::In, // This is coming from upstream
) {
let mapped = (self.mapper)(next);
self.destination.next(mapped); // And this is sending it downstream
}
Downstream & Upstream from the Operators Perspective
If we zoom out where this operator is used:
let _s = (1..=5)
.into_observable() // Relative to the `map` operator, this `IteratorObservable` is upstream
.map(|i| i * 2)
.skip(1) // And this `skip` operator is downstream
.subscribe(PrintObserver::new("map_operator")); // The `PrintObserver` is also downstream relative to `map`.
UpgradeableObserver
When subscribing to an Observable, sometimes we want the observable to be able to send an unsubscribe call to this destination, and sometimes it should be detached.
Remember: A subscriber is both an observer and a subscription
Regular Subscribers implement it by returning themselves as
there is nothing to upgrade to become a subscriber. Observers do not have a
SubscriptionLike impl therefore they need to pick another subscriber to wrap
themselves in, when used as a destination in a subscribe call. Which is
usually the detached implementation for observers.
Detached Subscriber
A subscriber is detached if it completely avoids sending unsubscribe, or in
some cases even complete signals.
Detached subscribers can’t unsubscribe downstream, serving as a hard boundary for unsubscription.
Why do errors have their own channel?
Since each operator and subscriber implements and does only one thing, dealing
with erroneous values in every operator would be very tedious. Imagine that
when you have an observable that emits Results because it’s fallible, your
mappers would need to do an inner map:
fallible_observable
.map(|i_result| i_result.map(|i| * 2))
.subscribe(...);
In case you do want to move errors between the
errorandnextchannels, you can use theinto_resultoperator to combine all upstreamnextanderrorsignals into onlynextsignals downstream, changing the downstream error type toNever.And using the
lift_erroroperator, you can unpack upstreamResultvalues into downstreamnextanderrorsignals. (In this case, you actually have 2 separate error types, the upstreamerrorsignal, and the upstreamnextresults error type. This is why you need to supply an error mapping function into this operator.)
Other Operators
More detailed information on individual operators and their behavior can be seen in their documentation page here in the book, or their package readme (which are the same documents).
The most important information on them are also available on the operators and (primarily) the extension functions themselves too for easy access during development!
Subjects
A subject is something that is both an observable and an observer at the same time!
This makes subjects capable to input data into subscriptions from “outside” of it!
Run this example:
cargo run --package rx_core_subject_publish --example subject_example
let mut subject = PublishSubject::<i32>::default();
subject.next(1); // Meteora - Track 11
let mut subscription = subject
.clone()
.subscribe(PrintObserver::<i32>::new("subject_example"));
subject.next(2);
subject.next(3);
subscription.unsubscribe();
subject.next(4);
Output:
subject_example - next: 2
subject_example - next: 3
finalize
subject_example - unsubscribed
We can clearly see that only those values were observed that were emitted during when the subscription was active!
Multicasting
As with any observable, a subject can be subscribed to multiple times! This means subjects are fundamentally multicasting!
Whenever you put a value inside it, all of their subscribers will receive it.
Once unsubscribed, no new values can be emitted by the subject. New subscriptions attempted on the subject will be immediately unsubscribed.
Example:
Run this example:
cargo run --package rx_core_subject_publish --example subject_multicasting_example
let mut subject = PublishSubject::<i32>::default();
subject.next(1);
let mut subscription_1 = subject
.clone()
.finalize(|| println!("finalize subscription 1"))
.subscribe(PrintObserver::<i32>::new("subject_subscription_1"));
subject.next(2);
let _subscription_2 = subject
.clone()
.finalize(|| println!("finalize subscription 2"))
.subscribe(PrintObserver::<i32>::new("subject_subscription_2"));
subject.next(3);
subscription_1.unsubscribe();
subject.next(4);
Output:
subject_subscription_1 - next: 2
subject_subscription_1 - next: 3
subject_subscription_2 - next: 3
finalize subscription 1
subject_subscription_1 - unsubscribed
subject_subscription_2 - next: 4
finalize subscription 2
subject_subscription_2 - unsubscribed
You can see that the signal 3 was heard by both subscriptions! And each
subscription had its own finalize callback! Each individual subscription is
unique and can have as many or little operators on it as you want!
PublishSubject
The vanilla subject, it multicasts incoming values to currently active subscribers.
Only completion and error signals are replayed to new subscribers if the subject was finished. If the subject was also unsubscribed, the new subscription too will be immediately unsubscribed.
As all other subjects are just wrappers around PublishSubject, this behavior is shared across all of them.
BehaviorSubject
A BehaviorSubject is a subject that always has exactly 1 value of it’s input type stored, therefore to create a new BehaviorSubject, you must provide an initial value.
Immediately when subscribed to, this initial value will be emitted!
This makes the BehaviorSubject ideal to be used as a reactive storage. A value that can change over time where subscribers are always reacting to the latest value without having to wait for it!
BehaviorSubjects continue to replay even after unsubscribed, but they can’t receive new values and the new subscription will be immediately unsubscribed. They do not replay however when they errored!
Not every type of value can have a sensible default, or even if they do, sometimes it doesn’t make sense to use it in the context! In that case, use a ReplaySubject<1, _>!
Example:
Run this example:
cargo run --package rx_core_subject_behavior --example behavior_subject_example
let mut subject = BehaviorSubject::<i32>::new(10);
// Immediately prints "hello 10"
let mut hello_subscription = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hello"));
subject.next(11);
let _s1 = subject
.clone()
.map(|next| next * 2)
.subscribe(PrintObserver::<i32>::new("hi double"));
subject.next(12);
hello_subscription.unsubscribe();
subject.next(13);
subject.complete();
let mut _compelted_subscription = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hello_completed"));
Output:
hello - next: 10
hello - next: 11
hi double - next: 22
hello - next: 12
hi double - next: 24
hello - unsubscribed
hi double - next: 26
hi double - completed
hi double - unsubscribed
hello_completed - next: 13
hello_completed - completed
hello_completed - unsubscribed
ReplaySubject
A ReplaySubject is a subject that buffers the last N emissions, and when
subscribed to immediately replays all of them!
Unlike BehaviorSubject, it does not guarantee that a value is always present, because it does not require you to define some values to create it.
But like BehaviorSubjects, ReplaySubjects continue to replay even after unsubscribed, but they can’t receive new values and the new subscription will be immediately unsubscribed.
You can still next some values immediately into it if you want!
This makes the ReplaySubject ideal to cache something that does not have a sensible default, initial value!
For situation: You’re waiting for a height measurement, which is a number, and
numbers have a default value of 0. Some pipelines downstream take this
measurement and calculate some things for you. It does not make sense to run
that computation with the value 0 as it’s not an actual measurement, just a
default. For this situation you can have either a ReplaySubject<1, f32> or a
BehaviorSubject<Option<f32>>(None). Sometimes you want stuff to start
immediately, even if there is no actual value. Or want this thing to return to
an initial, “uninitialized” state.
Example:
Run this example:
cargo run --package rx_core_subject_replay --example replay_subject_example
let mut subject = ReplaySubject::<2, i32>::default();
// Doesn't print out anything on subscribe
let _s = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hello"));
subject.next(1);
subject.next(2);
subject.next(3);
// Only the last two value is printed out, since our capacity is just 2
let _s2 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hi"));
subject.next(4);
subject.next(5);
Output:
hello - next: 1
hello - next: 2
hello - next: 3
hi - next: 2
hi - next: 3
hello - next: 4
hi - next: 4
hello - next: 5
hi - next: 5
hi - unsubscribed
hello - unsubscribed
When the second subscription subscribed, the buffer contained [2, 3] and was
immediately received by the new subscription!
AsyncSubject
The AsyncSubject will only emit once it completes.
Late subscribers who subscribe after it had already completed will also receive the last result, followed immediately with a completion signal.
What it will emit on completion depends on the reducer function used.
By default, it just replaces the result with the most recent observed
value next-ed into the subject.
But you can also specify your own reducer to accumulate all observed
values to be the result on completion.
Example:
Run this example:
cargo run --package rx_core_subject_async --example async_subject_example
let mut subject = AsyncSubject::<i32>::default();
let mut _subscription_1 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("async_subject sub_1"));
subject.next(1);
subject.next(2);
let mut _subscription_2 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("async_subject sub_2"));
subject.next(3);
subject.complete();
let mut _subscription_3 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("async_subject sub_3"));
Output:
async_subject sub_1 - next: 3
async_subject sub_2 - next: 3
async_subject sub_1 - completed
async_subject sub_2 - completed
async_subject sub_1 - unsubscribed
async_subject sub_2 - unsubscribed
async_subject sub_3 - next: 3
async_subject sub_3 - completed
async_subject sub_3 - unsubscribed
Scheduling
Every example so far was “immediate”, they either emitted all their values immediately, or - in the case of subjects - when we pushed values into them.
But what really makes observables useful is when they can react to things and emit a signal when they do! And for that, a subscription or a subscriber needs to be able to emit signals even without upstream signals triggering it’s own logic.
This requires something that runs in the “background” to drive the work issued by subscribers or subscriptions.
For example: “Do a
nextcall on this, 2 seconds from now!” or “Callnexton this every 200ms from now, until I say stop!”
Scheduler
The scheduler is a shared queue that subscribers have access to (always passed in by the user) to issue work to be executed.
Executor
The executor is the thing responsible to drive the work delegated to the scheduler, collected by the schedulers queue. It owns the scheduler, for which handles can be acquired from the executor.
Work
There are multiple types of work, depending on how they are handled with respect to time. So that reimplementing basic time based logic - like a delay - is not required by the issuer of the work.
Work can be issued, cancelled, or invoked from the scheduler queue.
Immediate Work
The simplest type of work, it executes as soon as the executor receives it.
Delayed Work
Delayed work will be executed only after its specified delay had passed.
Repeated Work
Repeated work re-executes its work each time it repeats, after a specified time interval.
Continuous Work
Continouos Work is like repeated work but without the time interval, they simply execute as many times as often as they can.
It depends on the executor to define the actual frequency this type of work is running at.
- With the tickable executor, this means on every
tickcall.- In Bevy, this means once every frame.
- In an async executor, this would be set to a reasonably fast interval, like a target FPS.
Invoked Work
Invoked work is not executed automatically, but based on its invoke_id
can be “invoked” which means executing it as soon as the executor can.
For a ticking executor this means the next tick after invocation
Scheduler Context
Executors define a context, passed in as a mutable reference to the
whenever they are executed. The main job of the context is to provide the
current time (as a Duration, denoting the time passed since startup).
For example: Interacting with the Bevy ECS world.
The context is defined by the executor, but subscribers can be written for specific contexts too. Resulting in context specific subscribers with extra capabilities relevant only in that context, compatible only with that executor.
Scheduler Work Input
Most generic scheduled subscribers do not need to know about anything besides the time coming from the context. Still, some executors can provide extra data relevant to the execution of the work at that moment.
For example: In the TickingExecutor, the
Tickobject is passed into every executed work.
TickingExecutor
The base scheduler used both for tests, and for Bevy.
It can be manually advanced by calling tick.
Time can only move forwards in the executor!
Interval Example
let mut mock_executor = MockExecutor::new_with_logging();
let scheduler = mock_executor.get_scheduler_handle();
let mut interval_observable = IntervalObservable::new(
IntervalObservableOptions {
duration: Duration::from_secs(1),
max_emissions_per_tick: 3,
start_on_subscribe: true,
},
scheduler,
);
let _subscription = interval_observable.subscribe(PrintObserver::new("interval_observable"));
mock_executor.tick(Duration::from_millis(600));
mock_executor.tick(Duration::from_millis(401));
mock_executor.tick(Duration::from_millis(16200)); // lag spike! would result in 16 emissions, but the limit is 2!
mock_executor.tick(Duration::from_millis(1200));
mock_executor.tick(Duration::from_millis(2200));
Output:
interval_observable - next: 0
Ticking... (600ms)
Ticking... (401ms)
interval_observable - next: 1
Ticking... (16.2s)
interval_observable - next: 2
interval_observable - next: 3
interval_observable - next: 4
Ticking... (1.2s)
interval_observable - next: 5
Ticking... (2.2s)
interval_observable - next: 6
interval_observable - next: 7
interval_observable - unsubscribed
Usage Within Bevy
Add the RxPlugin and one or more RxSchedulerPlugins for the schedules you
want to run your scheduled observables in.
If you do not use scheduled observables, you can skip adding the
RxSchedulerPlugins. You know if you use one because they all need aSchedulerHandle. SomeCommandsextensions also require aSchedulerHandleto create any kind of subscriptions!
fn main() -> AppExit {
App::new()
.add_plugins((
DefaultPlugins,
RxPlugin,
RxSchedulerPlugin::<Update, Virtual>::default(),
))
.run()
}
To access a scheduler, use the RxSchedule<BevySchedule, Clock> SystemParam:
fn setup_subscription(
mut commands: Commands,
rx_schedule_update_virtual: RxSchedule<Update, Virtual>,
) {
// Use `rx_schedule_update_virtual` to get a `SchedulerHandle` for
// creating scheduled observables within this system.
}
Now you can create subscriptions that are fully integrated with Bevy’s ECS, live as entites and react to Bevy events, component removals:
Use the
RxSignal<Out, OutError>to observe signals fromEntityDestinationsubscriptions!
fn setup_subscription(
mut commands: Commands,
rx_schedule_update_virtual: RxSchedule<Update, Virtual>,
) {
let destination_entity = commands
.spawn_empty()
.observe(|signal: Trigger<RxSignal<usize>>| println!("{:?}", signal.signal()))
.id();
let observable_entity = commands
.spawn(
IntervalObservable::new(
IntervalObservableOptions {
duration: Duration::from_secs(1),
start_on_subscribe: true,
max_emissions_per_tick: 1,
},
rx_schedule_update_virtual.handle(),
)
// .map(|i| i.to_string()) // This would change the output type of the observable, making the subscribe command below fail!
.into_component(),
)
.id();
// This is now **not** an `EntitySubscription`, as the subscription
// will be made once the command executes! It's just an `Entity`!
// Put it somewhere so you can despawn it!
let _subscription_entity = commands.subscribe(
observable_entity,
EntityDestination::<usize, Never>::new(
destination_entity,
rx_schedule_update_virtual.handle(),
),
);
}
Or you can create subscriptions that only partially integrate with Bevy’s ECS:
It’s perfectly fine to not use observables as components! You can create subscriptions directly from observables created in systems! Just be careful not dropping the subscription as they unsubscribe on drop!
fn setup_subscription(
mut commands: Commands,
rx_schedule_update_virtual: RxSchedule<Update, Virtual>,
mut my_subscriptions: ResMut<MySubscriptions>,
) {
let destination_entity = commands
.spawn_empty()
.observe(|signal: Trigger<RxSignal<usize>>| println!("{:?}", signal.signal()))
.id();
let subscription = IntervalObservable::new(
IntervalObservableOptions {
duration: Duration::from_secs(1),
start_on_subscribe: true,
max_emissions_per_tick: 1,
},
rx_schedule_update_virtual.handle(),
)
.subscribe(EntityDestination::new(
destination_entity,
rx_schedule_update_virtual.handle(),
));
my_subscriptions.add(subscription);
}
Use operators to crate more complex observables, and orchestrate your events!
KeyboardObservable::new(default(), rx_schedule_update_virtual.handle())
.filter(|key_code, _| {
matches!(
key_code,
KeyCode::Digit1 | KeyCode::Digit2 | KeyCode::Digit3 | KeyCode::Digit4
)
})
.start_with(KeyCode::Digit3)
.switch_map(
move |key_code| {
let duration = match key_code {
KeyCode::Digit1 => Duration::from_millis(5),
KeyCode::Digit2 => Duration::from_millis(100),
KeyCode::Digit3 => Duration::from_millis(500),
KeyCode::Digit4 => Duration::from_millis(2000),
_ => unreachable!(),
};
println!("Switching to a new inner observable with duration: {duration:?}");
IntervalObservable::new(
IntervalObservableOptions {
duration,
start_on_subscribe: false,
max_emissions_per_tick: 4,
},
schedule_update_virtual.clone(),
)
},
Never::map_into(),
)
.scan(|acc, _next| acc + 1, 0_usize)
Use subjects or the share operator to multicast observables to multiple
subscribers, save computation by sharing a single subscription!
Observables (Core)
observable_closed
An observable that immediately closes without completing or emitting any values.
See also
- CreateObservable - Define your own function that will interact with the subscriber!
- DeferredObservable - Subscribes to an observable returned by a function.
- JustObservable - Immediately emits a single value!
- EmptyObservable - Immediately completes!
- ThrowObservable - Immediately errors!
- NeverObservable - Never emits, never unsubscribes! Only once dropped!
Example
cargo run -p rx_core --example observable_closed_example
let _subscription = closed().subscribe(PrintObserver::new("closed"));
println!("end");
Output:
closed - unsubscribed
end
observable_combine_changes
The CombineChangesObservable subscribes to two input observables, and emits
when either of them emit, even if the other haven’t emitted yet.
It wraps downstream signals into the Change<T> enum, where a value is either:
JustUpdated- When this value’s changed caused the emission.Latest- When this value did not change since the last emission.None- When this observable have not emitted yet.
See Also
- CombineLatestObservable - Subscribes to two different observables, and emit the latest of both both values when either of them emits. It only starts emitting once both have emitted at least once.
- ZipObservable - Subscribes to two different observables, and emit both values when both of them emits, pairing up emissions by the order they happened.
- JoinObservable - Subscribes to two different observables, and emit the latest of both values once both of them had completed!
Example
cargo run -p rx_core --example observable_combine_changes_example
let mut greetings_subject = PublishSubject::<&'static str>::default();
let mut count_subject = PublishSubject::<usize>::default();
let mut subscription = combine_changes(greetings_subject.clone(), count_subject.clone())
.subscribe(PrintObserver::new("combine_changes"));
greetings_subject.next("Hello!");
count_subject.next(10);
count_subject.next(20);
greetings_subject.next("Szia!");
greetings_subject.complete();
count_subject.next(30);
count_subject.complete();
subscription.unsubscribe();
Output:
combine_changes - next: (JustUpdated("Hello!"), None)
combine_changes - next: (Latest("Hello!"), JustUpdated(10))
combine_changes - next: (Latest("Hello!"), JustUpdated(20))
combine_changes - next: (JustUpdated("Szia!"), Latest(20))
combine_changes - next: (Latest("Szia!"), JustUpdated(30))
combine_changes - completed
combine_changes - unsubscribed
observable_combine_latest
The CombineLatestObservable subscribes to two input observables, and emits
the latest of both values when either of them emits. It only starts emitting
once both have emitted at least once.
See Also
- CombineChangesObservable - Subscribes to two different observables, and emit the latest of both both values when either of them emits. It denotes which one had changed, and it emits even when one on them haven’t emitted yet.
- ZipObservable - Subscribes to two different observables, and emit both values when both of them emits, pairing up emissions by the order they happened.
- JoinObservable - Subscribes to two different observables, and emit the latest of both values once both of them had completed!
Example
cargo run -p rx_core --example observable_combine_latest_example
let mut greetings_subject = PublishSubject::<&'static str>::default();
let mut count_subject = PublishSubject::<usize>::default();
let mut subscription = combine_latest(
greetings_subject
.clone()
.tap(PrintObserver::new("greetings_subject")),
count_subject
.clone()
.tap(PrintObserver::new("count_subject")),
)
.subscribe(PrintObserver::new("combine_latest"));
greetings_subject.next("Hello!");
count_subject.next(10);
count_subject.next(20);
greetings_subject.next("Szia!");
greetings_subject.complete();
count_subject.next(30);
count_subject.complete();
subscription.unsubscribe();
Output:
greetings_subject - next: "Hello!"
count_subject - next: 10
combine_latest - next: ("Hello!", 10)
count_subject - next: 20
combine_latest - next: ("Hello!", 20)
greetings_subject - next: "Szia!"
combine_latest - next: ("Szia!", 20)
greetings_subject - completed
greetings_subject - unsubscribed
count_subject - next: 30
combine_latest - next: ("Szia!", 30)
count_subject - completed
count_subject - unsubscribed
combine_latest - completed
combine_latest - unsubscribed
observable_concat
Combine many observables of the same output type into one by subscribing to them sequentially in order.
See Also
- MergeObservable - Combine many observables of the same output type by subscribing to all of them at once.
Example
cargo run -p rx_core --example observable_concat_example
let mut subject_1 = PublishSubject::<usize>::default();
let mut subject_2 = PublishSubject::<usize>::default();
let mut subject_3 = PublishSubject::<usize>::default();
let _subscription = concat((
subject_1.clone(),
subject_2.clone().take(2),
subject_3.clone(),
))
.subscribe(PrintObserver::new("concat_operator"));
subject_1.next(1);
subject_1.complete();
subject_3.complete();
subject_2.next(2);
subject_2.next(3);
Output:
concat_operator - next: 1
concat_operator - next: 2
concat_operator - next: 3
concat_operator - completed
concat_operator - unsubscribed
observable_connectable
Maintains an internal connector subject and only subscribes the source when
connect is called, letting multiple subscribers share that connection.
See Also
- ShareOperator - Multicast a source through a connector so downstream subscribers share one upstream subscription. The connector can be any subject.
Example
cargo run -p rx_core --example observable_connectable_example
let mut source = PublishSubject::<usize>::default();
let mut connectable = ConnectableObservable::new(
source.clone().finalize(|| println!("disconnected...")),
ConnectableOptions {
connector_provider: ProvideWithDefault::<ReplaySubject<1, _>>::default(),
disconnect_when_ref_count_zero: true,
reset_connector_on_disconnect: false,
reset_connector_on_complete: false,
reset_connector_on_error: false,
},
);
let mut _subscription_0 = connectable.subscribe(PrintObserver::new("connectable_observable 0"));
source.next(0);
println!("connect!");
let _connection = connectable.connect();
source.next(1);
connectable.disconnect();
let mut _subscription_1 = connectable.subscribe(PrintObserver::new("connectable_observable 1"));
println!("connect again!");
connectable.connect();
source.next(2);
println!("end");
Output:
connect!
connectable_observable 0 - next: 1
disconnected...
connectable_observable 1 - next: 1
connect again!
connectable_observable 1 - next: 2
connectable_observable 0 - next: 2
end
connectable_observable 1 - unsubscribed
disconnected...
connectable_observable 0 - unsubscribed
observable_create
The create_observable provides a simple way to create custom observables by
defining a producer function that can emit values to subscribers.
The producer function is cloned for each subscribe call to avoid shared state between individual subscriptions.
See Also
- DeferredObservable - Subscribes to an observable returned by a function.
- JustObservable - Immediately emits a single value!
- EmptyObservable - Immediately completes!
- ThrowObservable - Immediately errors!
- ClosedObservable - Immediately unsubscribes!
- NeverObservable - Never emits, never unsubscribes! Only once dropped!
Example
cargo run -p rx_core --example observable_create_example
let _s = create_observable::<&str, Never, _>(|destination| {
destination.next("hello");
destination.complete();
})
.subscribe(PrintObserver::new("create_observable"));
create_observable - next: "hello"
create_observable - completed
create_observable - unsubscribed
observable_deferred
Subscribes to an observable returned by a function.
See Also
- CreateObservable - Define your own function that will interact with the subscriber!
- JustObservable - Immediately emits a single value!
- EmptyObservable - Immediately completes!
- ThrowObservable - Immediately errors!
- ClosedObservable - Immediately unsubscribes!
- NeverObservable - Never emits, never unsubscribes! Only once dropped!
Example
cargo run -p rx_core --example observable_deferred_example
let i = RefCell::new(1);
let mut deferred = deferred_observable(|| {
println!("subscribe!");
(0..=*i.borrow()).into_observable()
});
*i.borrow_mut() = 2;
let _subscription = deferred.subscribe(PrintObserver::new("deferred_observable"));
Output:
subscribe!
deferred_observable - next: 0
deferred_observable - next: 1
deferred_observable - next: 2
deferred_observable - completed
deferred_observable - unsubscribed
observable_empty
Immediately completes.
See Also
- CreateObservable - Define your own function that will interact with the subscriber!
- DeferredObservable - Subscribes to an observable returned by a function.
- JustObservable - Immediately emits a single value!
- ThrowObservable - Immediately errors!
- ClosedObservable - Immediately unsubscribes!
- NeverObservable - Never emits, never unsubscribes! Only once dropped!
Example
cargo run -p rx_core --example observable_empty_example
let _subscription = empty().subscribe(PrintObserver::new("empty"));
Output:
empty - completed
empty - unsubscribed
observable_interval
Emits a sequence of usize values every time the configured duration elapses.
See Also
- TimerObservable - Emits once after the timer elapses.
Example
Run the example with:
cargo run -p rx_core --example observable_interval_example
let mut mock_executor = MockExecutor::new_with_logging();
let scheduler = mock_executor.get_scheduler_handle();
let mut interval_observable = IntervalObservable::new(
IntervalObservableOptions {
duration: Duration::from_secs(1),
max_emissions_per_tick: 3,
start_on_subscribe: true,
},
scheduler,
);
let _subscription = interval_observable.subscribe(PrintObserver::new("interval_observable"));
mock_executor.tick(Duration::from_millis(600));
mock_executor.tick(Duration::from_millis(401));
mock_executor.tick(Duration::from_millis(16200));
mock_executor.tick(Duration::from_millis(1200));
mock_executor.tick(Duration::from_millis(2200));
Output:
interval_observable - next: 0
Ticking... (600ms)
Ticking... (401ms)
interval_observable - next: 1
Ticking... (16.2s)
interval_observable - next: 2
interval_observable - next: 3
interval_observable - next: 4
Ticking... (1.2s)
interval_observable - next: 5
Ticking... (2.2s)
interval_observable - next: 6
interval_observable - next: 7
interval_observable - unsubscribed
observable_iterator
This crate provides functionality to convert iterators into observables using the IntoIteratorObservableExt extension trait.
See Also
- IteratorOnTickObservable - Emits iterator items one per scheduler tick.
Features
- Extension Trait:
IntoIteratorObservableExtprovides theinto_observable()method for any type that implementsIntoIterator + Clone - Universal Support: Works with ranges, vectors, arrays, and any other iterator type
- No Conflicts: Uses an extension trait approach to avoid conflicts with the main
IntoObservabletrait
Usage
#![allow(unused)]
fn main() {
use rx_core::prelude::*;
// Convert ranges into observables
(1..=5).into_observable::<()>().subscribe(PrintObserver::new("range"));
// Convert vectors into observables
vec![1, 2, 3].into_observable::<()>().subscribe(PrintObserver::new("vector"));
// Convert arrays into observables
[10, 20, 30].into_observable::<()>().subscribe(PrintObserver::new("array"));
}
Examples
cargo run -p rx_core --example observable_iterator_into_example
cargo run -p rx_core --example observable_iterator_example
observable_iterator_on_tick
Emits iterator items one per scheduler tick.
See Also
- IteratorObservable - Emits all iterator items immediately when subscribed to.
Example
Run the example with:
cargo run -p rx_core --example observable_iterator_on_tick_example
let mut executor = MockExecutor::default();
let scheduler = executor.get_scheduler_handle();
let iterator_observable = IteratorOnTickObservable::new(
0..=7,
OnTickObservableOptions {
start_on_subscribe: true,
emit_at_every_nth_tick: 2,
},
scheduler,
);
let _subscription = iterator_observable
.finalize(|| println!("fin"))
.subscribe(PrintObserver::new("iterator_on_tick"));
println!("subscribed!");
executor.tick(Duration::from_millis(500));
executor.tick(Duration::from_millis(16));
executor.tick(Duration::from_millis(9001));
executor.tick(Duration::from_millis(0));
executor.tick(Duration::from_millis(10));
Output:
iterator_on_tick - next: 0
subscribed!
iterator_on_tick - next: 1
iterator_on_tick - next: 2
fin
iterator_on_tick - unsubscribed
observable_join
Emits the latest values from both inputs once both complete.
This observable will only emit once both of its input observables have completed. After which it will emit a tuple of the last emissions from each input observable, then complete.
Meaning if even one of the observables haven’t emitted before all of them had completed, only a complete notification will be observed!
If not all observables complete, nothing will be emitted even if all input observables were primed.
See Also
- CombineChangesObservable - Emits the latest of two sources, tagging which side changed, even before both have emitted.
- CombineLatestObservable - Emits the latest of two sources whenever either emits, after both emitted at least once.
- ZipObservable - Emits paired tuples when both sources emit, matched by emission order.
Example
cargo run -p rx_core --example observable_join_example
let observable_1 = (1..=3).into_observable();
let observable_2 = (4..=6).into_observable();
let _subscription = join(observable_1, observable_2).subscribe(PrintObserver::new("join"));
Output:
join - next: (3, 6)
join - completed
join - unsubscribed
observable_just
Immediately emits a single value.
See Also
- CreateObservable - Define your own function that will interact with the subscriber!
- DeferredObservable - Subscribes to an observable returned by a function.
- EmptyObservable - Immediately completes!
- ThrowObservable - Immediately errors!
- ClosedObservable - Immediately unsubscribes!
- NeverObservable - Never emits, never unsubscribes! Only once dropped!
Example
cargo run -p rx_core --example observable_just_example
let _s = just("hello").subscribe(PrintObserver::new("just"));
Output:
just - next: "hello"
just - completed
just - unsubscribed
observable_merge
Combine many observables of the same output type into one by subscribing to all of them at once.
See Also
- ConcatObservable - Combine many observables of the same output type by subscribing to them sequentially in order.
Example
cargo run -p rx_core --example observable_merge_example
let observable_1 = (1..=3).into_observable().skip(2);
let observable_2 = (4..=6).into_observable().take(1);
let observable_3 = (95..=98).into_observable();
let _subscription = merge((observable_1, observable_2, observable_3), usize::MAX)
.subscribe(PrintObserver::<i32>::new("merge_observable"));
Output:
merge_observable - next: 3
merge_observable - next: 4
merge_observable - next: 95
merge_observable - next: 96
merge_observable - next: 97
merge_observable - next: 98
merge_observable - completed
merge_observable - unsubscribed
observable_never
Never emits and never unsubscribes, only once it’s dropped!
Warning: You will be responsible to unsubscribe from subscriptions made to this observable, as it will never do so on its own!
See Also
- CreateObservable - Define your own function that will interact with the subscriber!
- DeferredObservable - Subscribes to an observable returned by a function.
- JustObservable - Immediately emits a single value!
- EmptyObservable - Immediately completes!
- ThrowObservable - Immediately errors!
- ClosedObservable - Immediately unsubscribes!
Example
cargo run -p rx_core --example observable_never_example
let _subscription = never().subscribe(PrintObserver::new("never"));
println!("nothing happens before dropping the subscription!");
Output:
nothing happens before dropping the subscription!
never - unsubscribed
observable_throw
Immediately errors.
See Also
- CatchOperator - On error, switch to a recovery observable.
- RetryOperator - Resubscribe on error up to the configured retry count.
- CreateObservable - Define your own function that will interact with the subscriber!
- DeferredObservable - Subscribes to an observable returned by a function.
- JustObservable - Immediately emits a single value!
- EmptyObservable - Immediately completes!
- ClosedObservable - Immediately unsubscribes!
- NeverObservable - Never emits, never unsubscribes! Only once dropped!
Example
cargo run -p rx_core --example observable_throw_example
let _subscription = throw("hello").subscribe(PrintObserver::new("throw_example"));
Output:
throw_example - error: "hello"
observable_timer
Emits once after the timer elapses.
See Also
- IntervalObservable -
Emits a sequence of
usizevalues on every interval tick.
Example
Run the example with:
cargo run -p rx_core --example observable_timer_example
let mut mock_executor = MockExecutor::new_with_logging();
let scheduler = mock_executor.get_scheduler_handle();
let mut timer = TimerObservable::new(Duration::from_secs(1), scheduler);
let _subscription = timer.subscribe(PrintObserver::new("timer_observable"));
mock_executor.tick(Duration::from_millis(600));
mock_executor.tick(Duration::from_millis(400));
Output:
Ticking... (600ms)
Ticking... (400ms)
timer_observable - next: ()
timer_observable - completed
timer_observable - unsubscribed
observable_zip
Subscribes to two observables, emitting paired tuples when both have emitted, matching them in emission order.
See Also
- CombineChangesObservable - Emits the latest of two sources, tagging which side changed, even before both have emitted.
- CombineLatestObservable - Emits the latest of two sources whenever either emits, after both emitted at least once.
- JoinObservable - Emits the last values from both sources once both have completed.
Example
cargo run -p rx_core --example observable_zip_example
let observable_1 = (1..=3).into_observable();
let observable_2 = (4..=6).into_observable();
let _subscription = zip(observable_1, observable_2)
.subscribe(PrintObserver::new("zip_observable"));
Output:
zip_observable - next: (1, 4)
zip_observable - next: (2, 5)
zip_observable - next: (3, 6)
zip_observable - completed
zip_observable - unsubscribed
Observables (Bevy Specific)
observable_event
The EventObservable turns Bevy events triggered on an entity into signals,
allowing you to use any event as an observable source, and construct reactive
pipelines from them using operators.
Subscribers will observe events targeted at the specified entity, and a completion signal once the entity is despawned.
See Also
- KeyboardObservable - Observe global key input.
- MessageObservable -
Observe messages written via
MessageWriter. - ProxyObservable - Subscribe to another observable entity.
- ResourceObservable - Observe derived values of a resource on change.
Example
cargo run -p rx_bevy --example observable_event_example
#[derive(Resource, Deref, DerefMut)]
pub struct DummyEventTarget(Entity);
#[derive(Resource, Default, Deref, DerefMut)]
pub struct ExampleSubscriptions(SharedSubscription);
fn setup(
mut commands: Commands,
rx_schedule_update_virtual: RxSchedule<Update, Virtual>,
mut subscriptions: ResMut<ExampleSubscriptions>,
) {
commands.spawn((
Camera3d::default(),
Transform::from_xyz(2., 6., 8.).looking_at(Vec3::ZERO, Vec3::Y),
));
let watched_entity = commands.spawn(Name::new("Watch me")).id();
subscriptions.add(
EventObservable::<DummyEvent>::new(watched_entity, rx_schedule_update_virtual.handle())
.subscribe(PrintObserver::new("event_observable")),
);
commands.insert_resource(DummyEventTarget(watched_entity));
}
Then, provided something is triggering DummyEvents on the watched entity:
Producer is sending DummyEvent { target: 6v1#4294967302 } to 6v1!
event_observable - next: DummyEvent { target: 6v1#4294967302 }
Producer is sending DummyEvent { target: 6v1#4294967302 } to 6v1!
event_observable - next: DummyEvent { target: 6v1#4294967302 }
...
observable_keyboard
The KeyboardObservable turns Bevy keyboard input events into signals. The
events are sourced from the ButtonInput<KeyCode> resource.
Options
KeyCode signals can be observed in multiple modes:
KeyboardObservableEmit::JustPressed- emits once when the key is pressed down.KeyboardObservableEmit::JustReleased- emits once when the key is released.KeyboardObservableEmit::WhilePressed- emits continuously while the key is held down.
See Also
- EventObservable - Observe events sent to an entity.
- MessageObservable -
Observe messages written via
MessageWriter. - ProxyObservable - Subscribe to another observable entity.
- ResourceObservable - Observe derived values of a resource on change.
Example
cargo run -p rx_bevy --example observable_keyboard_example
fn main() -> AppExit {
App::new()
.add_plugins((
DefaultPlugins,
RxPlugin,
RxSchedulerPlugin::<Update, Virtual>::default(),
))
.add_systems(Startup, setup)
.add_systems(
Update,
(
send_message(AppExit::Success).run_if(input_just_pressed(KeyCode::Escape)),
unsubscribe.run_if(input_just_pressed(KeyCode::Space)),
),
)
.run()
}
fn unsubscribe(mut example_entities: ResMut<MySubscriptions>) {
example_entities.subscription.unsubscribe();
}
#[derive(Resource)]
struct MySubscriptions {
subscription: SharedSubscription,
}
fn setup(mut commands: Commands, rx_schedule_update_virtual: RxSchedule<Update, Virtual>) {
let subscription = KeyboardObservable::new(default(), rx_schedule_update_virtual.handle())
.subscribe(PrintObserver::new("keyboard"));
commands.insert_resource(MySubscriptions {
subscription: SharedSubscription::new(subscription),
});
}
Output when pressing WASD keys and Space:
keyboard - next: KeyW
keyboard - next: KeyA
keyboard - next: KeyS
keyboard - next: KeyD
keyboard - next: Space
keyboard - unsubscribed
observable_message
The MessageObservable lets you observe global messages written by a
MessageWriter!
- Messages written in or before the observables schedule will be observed in the same frame. If the message was written in a later schedule, it will be observed in the next frame, which could lead to 1 frame flickers if something also reacts to the same message in the same frame it was written!
See Also
- EventObservable - Observe events sent to an entity.
- KeyboardObservable - Observe global key input.
- ProxyObservable - Subscribe to another observable entity.
- ResourceObservable - Observe derived values of a resource on change.
Example
cargo run -p rx_bevy --example observable_message_example
observable_proxy
The ProxyObservable can subscribe to another observable entity of matching
type!
See Also
- EventObservable - Observe events sent to an entity.
- KeyboardObservable - Observe global key input.
- MessageObservable -
Observe messages written via
MessageWriter. - ResourceObservable - Observe derived values of a resource on change.
Example
cargo run -p rx_bevy --example observable_proxy_example
let keyboard_observable_entity = commands
.spawn((
Name::new("KeyboardObservable"),
KeyboardObservable::new(default(), rx_schedule_update_virtual.handle())
.into_component(),
))
.id();
let _s = ProxyObservable::<KeyCode, Never>::new(
keyboard_observable_entity,
rx_schedule_update_virtual.handle(),
).subscribe(PrintObserver::new("proxy_observable"));
observable_resource
The ResourceObservable call a “reader” function on an observable every
time it is added or mutated, emitting the result to subscribers.
See Also
- EventObservable - Observe events sent to an entity.
- KeyboardObservable - Observe global key input.
- MessageObservable -
Observe messages written via
MessageWriter. - ProxyObservable - Subscribe to another observable entity.
Options
trigger_on_is_added: Emit also when the resource was just added. (Default: true)trigger_on_is_changed: Emit on each tick where the resource was accessed mutably, except when the resource was just added. (Default: true)
Example
cargo run -p rx_bevy --example observable_resource_example
ResourceObservable::<DummyResource, _, usize>::new(
|res| res.count,
ResourceObservableOptions {
trigger_on_is_added: true,
trigger_on_is_changed: true,
},
rx_schedule_update_virtual.handle(),
)
.subscribe(PrintObserver::new("resource_observable"));
Observers
observer_fn
Define observers from callbacks: FnObserver (static dispatch) and DynFnObserver (dynamic dispatch).
See Also
- PrintObserver - Log observed signals to stdout.
- NoopObserver - Ignore all signals (panics on errors in debug).
Example
cargo run -p rx_core --example observer_fn_example
let _subscription = just("world").subscribe(FnObserver::new(
|next| println!("hello: {next}"),
|_error| println!("error"),
|| {},
));
Output:
hello: world
observer_noop
Ignore all signals; panics on errors in debug mode.
See Also
- PrintObserver - Log observed signals to stdout.
- FnObserver / DynFnObserver - Provide custom callbacks to handle signals.
Example
cargo run -p rx_core --example observer_noop_example
let _subscription = just(1).subscribe(NoopObserver::default());
Output:
*crickets*
observer_print
Print all observed signals to stdout for quick debugging.
See Also
- FnObserver / DynFnObserver - Provide custom callbacks to handle signals.
- NoopObserver - Ignore all signals (panics on errors in debug).
Example
cargo run -p rx_core --example observer_print_example
let _subscription = just(1).subscribe(PrintObserver::new("hello"));
Output:
hello - next: 1
hello - completed
hello - unsubscribed
Observers (Bevy Specific)
EntityDestination
Send observed signals to an entity as RxSignal events.
EntityDestination is an RxObserver that wraps a Bevy Entity. When signals
are observed, they are forwarded to the destination entity as events, allowing
you to react to them using a Bevy observer system.
See Also
- ResourceDestination - Write into a resource when observing signals.
Usage
fn setup(rx_schedule_update_virtual: RxSchedule<Update, Virtual>, mut commands: Commands) {
let destination_entity = commands
.spawn_empty()
.observe(|signal: On<RxSignal<i32>>| println!("Received value: {:?}", signal.event()))
.id();
// or `just(1)` if you have `observable_fn` feature enabled
let _s = JustObservable::new(1).subscribe(EntityDestination::new(
destination_entity,
rx_schedule_update_virtual.handle(),
));
}
ResourceDestination
Write into a Bevy resource when observing signals.
ResourceDestination is an RxObserver that allows you to write observed
signals directly into a Bevy Resource.
See Also
- EntityDestination - Send observed signals to an entity as events.
Usage
#[derive(Resource, Default, Debug)]
struct Counter(i32);
fn setup(rx_schedule_update_virtual: RxSchedule<Update, Virtual>) {
let _s = JustObservable::new(1).subscribe(ResourceDestination::new(
|mut counter: Mut<'_, Counter>, signal| {
println!("Received signal: {:?}", signal);
if let ObserverNotification::Next(value) = signal {
counter.0 = value;
}
println!("Counter updated to: {:?}", counter);
},
rx_schedule_update_virtual.handle(),
));
}
Operators (Core)
operator_adsr
Convert trigger signals into an ADSR envelope driven by the scheduler.
See Also
- DebounceTimeOperator - Emit the most recent value after a period of silence.
- DelayOperator - Shift emissions forward in time using the scheduler.
- FallbackWhenSilentOperator - Emit a fallback value on ticks where the source stayed silent.
- ThrottleTimeOperator - Limit the frequency of downstream emissions.
Example
cargo run -p rx_core --example operator_adsr_example
use std::time::Duration;
use rx_core::prelude::*;
use rx_core_testing::MockExecutor;
fn main() {
let mut executor = MockExecutor::default();
let scheduler = executor.get_scheduler_handle();
let envelope = AdsrEnvelope {
attack_time: Duration::from_millis(10),
decay_time: Duration::from_millis(10),
sustain_volume: 0.5,
release_time: Duration::from_millis(15),
..Default::default()
};
let mut source = PublishSubject::<AdsrTrigger>::default();
let mut subscription = source
.clone()
.adsr(
AdsrOperatorOptions {
envelope,
..Default::default()
},
scheduler.clone(),
)
.subscribe(PrintObserver::new("adsr"));
source.next(true.into());
executor.tick(Duration::from_millis(10));
executor.tick(Duration::from_millis(10));
source.next(false.into());
executor.tick(Duration::from_millis(15));
subscription.unsubscribe();
}
adsr - next: AdsrSignal { adsr_envelope_phase: Attack, phase_transition: AdsrEnvelopePhaseTransition(1), t: 0ns, value: 0.0 }
adsr - next: AdsrSignal { adsr_envelope_phase: Decay, phase_transition: AdsrEnvelopePhaseTransition(2), t: 10ms, value: 1.0 }
adsr - next: AdsrSignal { adsr_envelope_phase: None, phase_transition: AdsrEnvelopePhaseTransition(16), t: 0ns, value: 0.0 }
adsr - unsubscribed
operator_buffer_count
Collect values into fixed-size buffers before emitting them.
Example
cargo run -p rx_core --example operator_buffer_count_example
#![allow(unused)]
fn main() {
let _s = (1..=25)
.into_observable()
.buffer_count(3)
.subscribe(PrintObserver::new("buffer_count_operator"));
}
buffer_count_operator - next: [1, 2, 3]
buffer_count_operator - next: [4, 5, 6]
buffer_count_operator - next: [7, 8, 9]
buffer_count_operator - next: [10, 11, 12]
buffer_count_operator - next: [13, 14, 15]
buffer_count_operator - next: [16, 17, 18]
buffer_count_operator - next: [19, 20, 21]
buffer_count_operator - next: [22, 23, 24]
buffer_count_operator - next: [25]
buffer_count_operator - completed
buffer_count_operator - unsubscribed
operator_catch
On error, switch to a recovery observable.
See Also
- RetryOperator - Resubscribe on error up to the configured retry count.
- IntoResultOperator -
Capture next/error signals as
Resultvalues. - LiftResultOperator -
Split
Resultvalues into next and error signals. - ErrorBoundaryOperator -
Enforce
Neveras the error type to guard pipelines at compile time.
Example
cargo run -p rx_core --example operator_catch_example
#![allow(unused)]
fn main() {
let _s = concat((
(1..=3).into_observable().map_never(),
throw("error").map_never(),
))
.map(|i| i * 10)
.catch(|_error| IteratorObservable::new(90..=92))
.subscribe(PrintObserver::new("catch"));
}
Output:
catch - next: 10
catch - next: 20
catch - next: 30
catch - next: 90
catch - next: 91
catch - next: 92
catch - completed
catch - unsubscribed
operator_composite
Build reusable operator chains without needing a source observable.
See Also
- IdentityOperator -
A no-op operator, used mainly as the entry point of a
CompositeOperator.
Example
cargo run -p rx_core --example operator_composite_operators_example
use rx_core::prelude::*;
/// Composite operators offer an easy way to create complex operators, but they
/// do increase type complexity, good for prototyping and smaller things, but
/// you should prefer implementing an actual operator
fn main() {
// Though not necessary, the IdentityOperator provides an easy way to define
// input types for our composite operator.
let op = IdentityOperator::<i32, Never>::default()
.map(|next: i32| next + 1)
.map(|next: i32| next * 100);
let _s = just(1).pipe(op).subscribe(PrintObserver::new("hello"));
// Or though the type extensions you can chain built in operators just like on observables
let op_2 = IdentityOperator::<i32, Never>::default()
.map(|i| i * 2)
.filter(|i, _| i % 2 == 0);
let _s2 = just(1).pipe(op_2).subscribe(PrintObserver::new("bello"));
}
hello - next: 200
hello - completed
hello - unsubscribed
bello - next: 2
bello - completed
bello - unsubscribed
operator_concat_all
Example
cargo run -p rx_core --example operator_concat_all_example
let mut mock_executor = MockExecutor::new_with_logging();
let scheduler = mock_executor.get_scheduler_handle();
let mut enqueue_timer_of_length = PublishSubject::<usize>::default();
let mut _subscription = enqueue_timer_of_length
.clone()
.finalize(|| println!("finalize: upstream"))
.tap_next(|n| println!("emit (source): {n:?}"))
.map(move |next| {
interval(
IntervalObservableOptions {
duration: Duration::from_secs(1),
start_on_subscribe: false,
max_emissions_per_tick: 10,
},
scheduler.clone(),
)
.finalize(move || println!("timer of {next} finished!"))
.take(next)
.map(move |i| format!("{i} (Timer of {next})"))
})
.concat_all(Never::map_into())
.finalize(|| println!("finalize: downstream"))
.subscribe(PrintObserver::new("concat_all"));
enqueue_timer_of_length.next(4);
enqueue_timer_of_length.next(1);
enqueue_timer_of_length.next(3);
enqueue_timer_of_length.complete();
mock_executor.tick(Duration::from_secs(4));
mock_executor.tick(Duration::from_secs(1));
mock_executor.tick(Duration::from_secs(3));
Output:
emit (source): 4
emit (source): 1
emit (source): 3
Ticking... (4s)
concat_all - next: "0 (Timer of 4)"
concat_all - next: "1 (Timer of 4)"
concat_all - next: "2 (Timer of 4)"
concat_all - next: "3 (Timer of 4)"
timer of 4 finished!
Ticking... (1s)
concat_all - next: "0 (Timer of 1)"
timer of 1 finished!
Ticking... (3s)
concat_all - next: "0 (Timer of 3)"
concat_all - next: "1 (Timer of 3)"
concat_all - next: "2 (Timer of 3)"
concat_all - completed
finalize: downstream
finalize: upstream
concat_all - unsubscribed
timer of 3 finished!
end
operator_concat_map
Map each value to an inner observable and subscribe to them one at a time in order.
See Also
- ConcatAllOperator - Subscribes to upstream observables one at a time in order.
- MergeAllOperator - Subscribes to upstream observables and merges their emissions concurrently.
- SwitchAllOperator - Switch to the newest inner observable, unsubscribing previous ones.
- ExhaustAllOperator - Ignore new inner observables while one is active.
- MergeMapOperator - Map each value to an inner observable and merge their emissions concurrently.
- SwitchMapOperator - Map each value to an inner observable and switch to the latest, unsubscribing previous ones.
- ExhaustMapOperator - Map each value to an inner observable and ignore new ones while one is active.
Example
cargo run -p rx_core --example operator_concat_map_example
#[derive(Clone, Debug)]
enum Either {
Left,
Right,
}
let mut upstream_subject = PublishSubject::<Either>::default();
let mut inner_left_subject = PublishSubject::<i32>::default();
let mut inner_right_subject = PublishSubject::<i32>::default();
let l = inner_left_subject.clone();
let r = inner_right_subject.clone();
let mut _subscription = upstream_subject
.clone()
.finalize(|| println!("finalize: upstream"))
.tap_next(|n| println!("emit (source): {n:?}"))
.concat_map(
move |next| match next {
Either::Left => l.clone(),
Either::Right => r.clone(),
},
Never::map_into(),
)
.finalize(|| println!("finalize: downstream"))
.subscribe(PrintObserver::new("concat_map"));
upstream_subject.next(Either::Left);
inner_left_subject.next(1);
inner_right_subject.next(2);
inner_left_subject.next(3);
inner_right_subject.next(4);
upstream_subject.next(Either::Right);
inner_left_subject.next(5);
inner_right_subject.next(6);
inner_left_subject.next(7);
inner_right_subject.next(8);
inner_left_subject.complete();
inner_left_subject.next(9);
inner_right_subject.next(10);
inner_right_subject.complete();
upstream_subject.complete();
Output:
emit (source): Left
concat_map - next: 1
concat_map - next: 3
emit (source): Right
concat_map - next: 5
concat_map - next: 7
concat_map - next: 10
concat_map - completed
finalize: downstream
finalize: upstream
concat_map - unsubscribed
operator_count
The count operator counts upstream emissions and emits the total once
upstream completes.
See Also
- IsEmptyOperator - Emit a single boolean indicating if the source emitted anything before it had completed.
- FilterOperator - Keep values that satisfy a predicate.
- ReduceOperator - Fold values and emit only the final accumulator on completion.
Example
cargo run -p rx_core --example operator_count_example
let _subscription = (1..=6)
.into_observable()
.filter(|value, _index| value % 2 == 0)
.count()
.subscribe(PrintObserver::new("count_operator"));
Output:
count_operator - next: 3
count_operator - completed
count_operator - unsubscribed
operator_debounce_time
The debounce_time operator emits the most recent upstream value only after
the specified duration passes without another emission.
Upstream completion and cancellation can happen instantly if there are no pending debounced values, otherwise it will complete or cancel once the pending debounced value has been emitted.
Upstream errors are immediately propagated downstream, cancelling any pending debounced value.
See Also
- AdsrOperator - Convert trigger signals into an ADSR envelope driven by the scheduler.
- DelayOperator - Shift emissions forward in time using the scheduler.
- FallbackWhenSilentOperator - Emit a fallback value on ticks where the source stayed silent.
- ObserveOnOperator - Re-emit upstream signals with the provided scheduler.
- SubscribeOnOperator - Schedule upstream subscription on the provided scheduler.
- ThrottleTimeOperator - Limit the frequency of downstream emissions.
Example
cargo run -p rx_core --example operator_debounce_time_example
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();
let mut subject = PublishSubject::<usize>::default();
let _subscription = subject
.clone()
.debounce_time(Duration::from_millis(1000), scheduler)
.subscribe(PrintObserver::new("debounce_time_operator"));
subject.next(1);
executor.tick(Duration::from_millis(500));
subject.next(2);
executor.tick(Duration::from_millis(1000));
subject.complete();
Output:
Ticking... (500ms)
Ticking... (1s)
debounce_time_operator - next: 2
debounce_time_operator - completed
debounce_time_operator - unsubscribed
operator_delay
The delay operator shifts upstream values forward in time by a specified
duration.
Upstream completion and cancellation can happen instantly if there are no pending delayed values, otherwise it will complete or cancel once all delayed values have been emitted.
Upstream errors are immediately propagated downstream, cancelling any pending delayed values.
See Also
- AdsrOperator - Convert trigger signals into an ADSR envelope driven by the scheduler.
- DebounceTimeOperator - Emit the most recent value after a period of silence.
- FallbackWhenSilentOperator - Emit a fallback value on ticks where the source stayed silent.
- SubscribeOnOperator - Schedule upstream subscription on the provided scheduler.
- ThrottleTimeOperator - Limit the frequency of downstream emissions.
Example
cargo run -p rx_core --example operator_delay_example
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();
let _subscription = (1..=3)
.into_observable()
.delay(Duration::from_millis(1000), scheduler)
.subscribe(PrintObserver::new("delay_operator"));
executor.tick(Duration::from_millis(1000));
Output:
Ticking... (1s)
delay_operator - next: 1
delay_operator - next: 2
delay_operator - next: 3
delay_operator - completed
delay_operator - unsubscribed
operator_dematerialize
Convert notifications back into real signals.
See Also
- MapOperator - Transform each value with a mapping function.
- MapIntoOperator -
Map each value using
Into. - MapErrorOperator - Transform error values into another error value.
- MapNeverOperator -
Re-type
Neversignals into concrete types. - MaterializeOperator - Turn next/error/complete into notification values.
- EnumerateOperator - Attach a running index to each emission.
- PairwiseOperator - Emit the previous and current values together.
Example
cargo run -p rx_core --example operator_dematerialize_example
let _subscription = [
ObserverNotification::<_, Never>::Next(1),
ObserverNotification::Complete,
]
.into_observable()
.dematerialize()
.subscribe(PrintObserver::new("dematerialize_operator"));
Output:
dematerialize_operator - next: 1
dematerialize_operator - completed
dematerialize_operator - unsubscribed
operator_element_at
Emit the value at the given index then complete.
If the element at the specified index does not exist, because it had completed before reaching that index, the operator will either error with [ElementAtOperatorError::IndexOutOfRange] or emit a default value if one was provided.
See Also
- FindOperator - Emit the first value that matches a predicate.
- FilterOperator - Keep values that satisfy a predicate.
- FirstOperator - Emit only the first value, then complete.
Example
cargo run -p rx_core --example operator_element_at_example
let _subscription = vec!["a", "b", "c", "d", "e"]
.into_observable()
.element_at(2)
.subscribe(PrintObserver::new("element_at_operator"));
Output:
element_at_operator - next: "c"
element_at_operator - completed
element_at_operator - unsubscribed
operator_end_with
Emit a value on completion.
See Also
- StartWithOperator - Emit a value first when subscribing to the source.
Example
cargo run -p rx_core --example operator_end_with_example
let _subscription = (1..=5)
.into_observable()
.end_with(99)
.subscribe(PrintObserver::new("end_with_operator"));
Output:
end_with_operator - next: 1
end_with_operator - next: 2
end_with_operator - next: 3
end_with_operator - next: 4
end_with_operator - next: 5
end_with_operator - next: 99
end_with_operator - completed
end_with_operator - unsubscribed
operator_enumerate
Attach a running index to each emission.
See Also
- MapOperator - Transform each value with a mapping function.
- MapIntoOperator -
Map each value using
Into. - MapErrorOperator - Transform error values into another error value.
- MapNeverOperator -
Re-type
Neversignals into concrete types. - MaterializeOperator - Turn next/error/complete into notification values.
- DematerializeOperator - Convert notifications back into real signals.
- PairwiseOperator - Emit the previous and current values together.
Example
cargo run -p rx_core --example operator_enumerate_example
let _subscription = (10..=15)
.into_observable()
.enumerate()
.subscribe(PrintObserver::new("enumerate_operator"));
Output:
enumerate_operator - next: (10, 0)
enumerate_operator - next: (11, 1)
enumerate_operator - next: (12, 2)
enumerate_operator - next: (13, 3)
enumerate_operator - next: (14, 4)
enumerate_operator - next: (15, 5)
enumerate_operator - completed
enumerate_operator - unsubscribed
operator_error_boundary
Enforce Never as the error type to guard pipelines at compile time.
See Also
- CatchOperator - On error, switch to a recovery observable.
- RetryOperator - Resubscribe on error up to the configured retry count.
- IntoResultOperator -
Capture next/error signals as
Resultvalues. - LiftResultOperator -
Split
Resultvalues into next and error signals.
Example
cargo run -p rx_core --example operator_error_boundary_example
use rx_core::prelude::*;
/// The [IdentityOperator] does nothing. The only purpose it has
/// is to define inputs for a [CompositeOperator]: an [Operator] that made out
/// of other [Operator]s without having to use a [Pipe] which would require a
/// source [Observable]
fn main() {
let _s = (1..=5)
.into_observable()
.map(|i| i * 2)
.error_boundary()
.subscribe(PrintObserver::new("error_boundary_operator (composite)"));
// This cannot compile as relative to the `error_boundary` operator,
// upstreams error type is not `Never`
// let _s2 = throw("error".to_string())
// .map(|i| i)
// .error_boundary()
// .subscribe(PrintObserver::new("error_boundary_operator (composite)"));
let _s3 = throw("error".to_string())
.map(|i| i)
.into_result()
.error_boundary()
.subscribe(PrintObserver::new("error_boundary_operator (composite)"));
}
error_boundary_operator (composite) - next: 2
error_boundary_operator (composite) - next: 4
error_boundary_operator (composite) - next: 6
error_boundary_operator (composite) - next: 8
error_boundary_operator (composite) - next: 10
error_boundary_operator (composite) - completed
error_boundary_operator (composite) - unsubscribed
error_boundary_operator (composite) - next: Err("error")
error_boundary_operator (composite) - unsubscribed
operator_exhaust_all
Example
cargo run -p rx_core --example operator_exhaust_all_example
operator_exhaust_map
Map each value to an inner observable and ignore new ones while one is active.
See Also
- ConcatAllOperator - Subscribes to upstream observables one at a time in order.
- MergeAllOperator - Subscribes to upstream observables and merges their emissions concurrently.
- SwitchAllOperator - Switch to the newest inner observable, unsubscribing previous ones.
- ExhaustAllOperator - Ignore new inner observables while one is active.
- ConcatMapOperator - Map each value to an inner observable and subscribe to them one at a time in order.
- MergeMapOperator - Map each value to an inner observable and merge their emissions concurrently.
- SwitchMapOperator - Map each value to an inner observable and switch to the latest, unsubscribing previous ones.
Example
cargo run -p rx_core --example operator_exhaust_map_example
use std::time::Duration;
use rx_core::prelude::*;
use rx_core_testing::MockExecutor;
fn main() {
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();
let mut source = PublishSubject::<i32>::default();
let mut subscription = source
.clone()
.exhaust_map(
move |next| {
println!("Trying to switch to the {}. inner observable..", next);
interval(
IntervalObservableOptions {
duration: Duration::from_millis(1000),
max_emissions_per_tick: 10,
start_on_subscribe: false,
},
scheduler.clone(),
)
.take(3)
},
Never::map_into(),
)
.subscribe(PrintObserver::new("exhaust_map"));
source.next(1);
executor.tick(Duration::from_millis(1000));
executor.tick(Duration::from_millis(1000));
source.next(2); // Ignored while an inner observable is active
executor.tick(Duration::from_millis(1000));
source.next(3); // Switches after the previous inner completes
source.next(4); // Ignored because the new inner just started
executor.tick(Duration::from_millis(1000));
executor.tick(Duration::from_millis(1000));
source.complete();
executor.tick(Duration::from_millis(1000));
source.unsubscribe();
println!("end");
subscription.unsubscribe();
}
Output:
Trying to switch to the 1. inner observable..
Ticking... (1s)
exhaust_map - next: 0
Ticking... (1s)
exhaust_map - next: 1
Trying to switch to the 2. inner observable..
Ticking... (1s)
exhaust_map - next: 2
Trying to switch to the 3. inner observable..
Trying to switch to the 4. inner observable..
Ticking... (1s)
exhaust_map - next: 0
Ticking... (1s)
exhaust_map - next: 1
Ticking... (1s)
exhaust_map - next: 2
exhaust_map - completed
exhaust_map - unsubscribed
end
operator_fallback_when_silent
Emit a fallback value on ticks where the source stayed silent.
See Also
- AdsrOperator - Convert trigger signals into an ADSR envelope driven by the scheduler.
- DebounceTimeOperator - Emit the most recent value after a period of silence.
- DelayOperator - Shift emissions forward in time using the scheduler.
- ThrottleTimeOperator - Limit the frequency of downstream emissions.
Example
cargo run -p rx_core --example operator_fallback_when_silent_example
use std::time::Duration;
use rx_core::prelude::*;
use rx_core_testing::MockExecutor;
fn main() {
let mut executor = MockExecutor::default();
let scheduler = executor.get_scheduler_handle();
let mut subject = PublishSubject::<i32>::default();
let mut subscription = subject
.clone()
.fallback_when_silent(|_, _, _| Default::default(), scheduler)
.subscribe(PrintObserver::<i32>::new("fallback_when_silent"));
subject.next(1);
executor.tick(Duration::from_millis(200));
subject.next(2);
executor.tick(Duration::from_millis(200));
// Silence
executor.tick(Duration::from_millis(200));
subject.next(3);
executor.tick(Duration::from_millis(200));
subscription.unsubscribe();
}
fallback_when_silent - next: 1
fallback_when_silent - next: 2
fallback_when_silent - next: 0
fallback_when_silent - next: 3
fallback_when_silent - unsubscribed
operator_filter
Keep values that satisfy a predicate.
See Also
- FilterMapOperator -
Map values to an
Optionand keep only theSomevalues. - TakeOperator -
Emit only the first
nvalues, then complete. - SkipOperator -
Drop the first
nvalues. - LiftOptionOperator -
Filter out
Noneand forwardSomevalues.
Example
cargo run -p rx_core --example operator_filter_example
let _subscription = (1..=5)
.into_observable()
.map(|next: i32| next + 1)
.filter(|i, _| i > &2)
.subscribe(PrintObserver::new("filter_operator"));
Output:
filter_operator - next: 3
filter_operator - next: 4
filter_operator - next: 5
filter_operator - next: 6
filter_operator - completed
filter_operator - unsubscribed
operator_filter_map
Map values to an Option and keep only the Some values.
See Also
- FilterOperator - Keep values that satisfy a predicate.
- TakeOperator -
Emit only the first
nvalues, then complete. - SkipOperator -
Drop the first
nvalues. - LiftOptionOperator -
Filter out
Noneand forwardSomevalues.
Example
cargo run -p rx_core --example operator_filter_map_example
let _subscription = (1..=5)
.into_observable()
.filter_map(|i| if i % 2 == 0 { Some(i) } else { None })
.subscribe(PrintObserver::new("filter_map_operator"));
Output:
filter_map_operator - next: 2
filter_map_operator - next: 4
filter_map_operator - completed
filter_map_operator - unsubscribed
operator_finalize
Execute cleanup when the observable finishes or unsubscribes.
See Also
- TapOperator - Mirror all signals into another observer.
- TapNextOperator -
Run a callback for each
nextvalue while letting signals pass through. - OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
- OnSubscribeOperator - Run a callback when a subscription is established.
Example
cargo run -p rx_core --example operator_finalize_completion_example
let _subscription = just(12)
.finalize(|| println!("finally!"))
.subscribe(PrintObserver::new("finalize_operator"));
Output:
finalize_operator - next: 12
finalize_operator - completed
finally!
finalize_operator - unsubscribed
operator_find
Emit the first value that matches a predicate.
See Also
- FilterOperator - Keep values that satisfy a predicate.
- FindIndexOperator - Emit the index of the first value that matches a predicate.
- ElementAtOperator - Emit the value at the given index then complete.
- FirstOperator - Emit only the first value, then complete.
Example
cargo run -p rx_core --example operator_find_example
let _subscription = (1..=5)
.into_observable()
.find(|i| i % 2 == 0)
.subscribe(PrintObserver::new("find_operator"));
Output:
find_operator - next: 2
find_operator - completed
find_operator - unsubscribed
operator_find_index
Emit the index of the first value that matches a predicate.
See Also
- FindOperator - Emit the first value that matches a predicate.
- ElementAtOperator - Emit the value at the given index then complete.
- FilterOperator - Keep values that satisfy a predicate.
- FirstOperator - Emit only the first value, then complete.
Example
cargo run -p rx_core --example operator_find_index_example
let _subscription = (1..=5)
.into_observable()
.find_index(|i| i % 2 == 0)
.subscribe(PrintObserver::new("find_index_operator"));
Output:
find_index_operator - next: 1
find_index_operator - completed
find_index_operator - unsubscribed
operator_first
Emit only the first value, then complete.
See Also
- TakeOperator -
Emit only the first
nvalues, then complete. - FindOperator - Emit the first value that matches a predicate.
- ElementAtOperator - Emit the value at the given index then complete.
- FindIndexOperator - Emit the index of the first value that matches a predicate.
Example
cargo run -p rx_core --example operator_first_example
let _subscription = (1..=5)
.into_observable()
.first()
.subscribe(PrintObserver::new("first_operator"));
Output:
first_operator - next: 1
first_operator - completed
first_operator - unsubscribed
operator_identity
The identity operator is a no-op operator.
In layman’s terms: speedy thing goes in, speedy thing comes out.
It is used to conveniently define the input types of a composite operator.
This is why only this operator has a standalone compose_operator function and
has no Observable extension methods.
See Also
Example
cargo run -p rx_core --example operator_identity_example
// If it would exist, this would be the same as: `just(1).identity().subscribe(...)`
let _s = IdentityOperator::default()
.operate(just(1))
.subscribe(PrintObserver::new("identity_operator"));
identity_operator - next: 1
identity_operator - completed
identity_operator - unsubscribed
Example (Composite)
cargo run -p rx_core --example operator_identity_composite_example
let composite_operator = compose_operator::<i32, Never>()
.map(|i| i + 1)
.filter(|i, _| i < &4);
let _s = (1..=5)
.into_observable()
.pipe(composite_operator)
.subscribe(PrintObserver::new("identity_operator (composite)"));
identity_operator (composite) - next: 2
identity_operator (composite) - next: 3
identity_operator (composite) - completed
identity_operator (composite) - unsubscribed
operator_into_result
Error handling operator. Captures upstream values and errors, and forwards them
downstream as a Result.
See Also
- CatchOperator - On error, switch to a recovery observable.
- RetryOperator - Resubscribe on error up to the configured retry count.
- LiftResultOperator -
Split
Resultvalues into next and error signals. - ErrorBoundaryOperator -
Enforce
Neveras the error type to guard pipelines at compile time.
Example
cargo run -p rx_core --example operator_into_result_example
let _s = throw("error!".to_string())
.into_result()
.subscribe(PrintObserver::new("into_result_operator - throw"));
let _s = just(1)
.into_result()
.subscribe(PrintObserver::new("into_result_operator - just"));
into_result_operator - throw - next: Err("error!")
into_result_operator - throw - unsubscribed
into_result_operator - just - next: Ok(1)
into_result_operator - just - completed
into_result_operator - just - unsubscribed
operator_is_empty
The is_empty operator will emit a single boolean value indicating whether
upstream emitted any items before completing:
- If the upstream completes without emitting any items,
is_emptywill emittrueand then complete. - If the upstream emits any items,
is_emptywill immediately emitfalseand complete.
Example
cargo run -p rx_core --example operator_is_empty_example
let _s = (1..=5)
.into_observable()
.is_empty() // Will stop the iterator, send `false`, then complete.
.subscribe(PrintObserver::new("is_empty_operator - iterator"));
let _s = empty() // Immediately completes.
.is_empty()
.subscribe(PrintObserver::new("is_empty_operator - empty"));
Output:
is_empty_operator - iterator - next: false
is_empty_operator - iterator - completed
is_empty_operator - iterator - unsubscribed
is_empty_operator - empty - next: true
is_empty_operator - empty - completed
is_empty_operator - empty - unsubscribed
operator_lift_option
Filter out None and forward Some values.
See Also
- FilterOperator - Keep values that satisfy a predicate.
- FilterMapOperator -
Map values to an
Optionand keep only theSomevalues. - TakeOperator -
Emit only the first
nvalues, then complete. - SkipOperator -
Drop the first
nvalues.
Example
cargo run -p rx_core --example operator_lift_option_example
let _subscription = (1..=5)
.into_observable()
.map(|i| if i % 2 == 0 { Some(i) } else { None })
.lift_option()
.subscribe(PrintObserver::new("lift_option_operator"));
Output:
lift_option_operator - next: 2
lift_option_operator - next: 4
lift_option_operator - completed
lift_option_operator - unsubscribed
operator_lift_result
Split Result values into next and error signals.
See Also
- CatchOperator - On error, switch to a recovery observable.
- RetryOperator - Resubscribe on error up to the configured retry count.
- IntoResultOperator -
Capture next/error signals as
Resultvalues. - ErrorBoundaryOperator -
Enforce
Neveras the error type to guard pipelines at compile time.
Example
cargo run -p rx_core --example operator_lift_result_example
use rx_core::prelude::*;
fn main() {
let _s = (1..=5)
.into_observable()
.map(|i| {
if i <= 3 {
Result::<i32, String>::Ok(i)
} else {
Result::<i32, String>::Err("Larger than 3!".to_string())
}
})
// We're lifting the result error from the "next" channel, but we still have to deal with
// upstream errors if they exist, this `unreachable!` is just here to ignore them.
.lift_result()
.subscribe(PrintObserver::new("lift_result_operator"));
}
lift_result_operator - next: 1
lift_result_operator - next: 2
lift_result_operator - next: 3
lift_result_operator - error: "Larger than 3!"
lift_result_operator - unsubscribed
operator_map
Transform each value with a mapping function.
See Also
- MapIntoOperator -
Map each value using
Into. - MapErrorOperator - Transform error values into another error value.
- MapNeverOperator -
Re-type
Neversignals into concrete types. - MaterializeOperator - Turn next/error/complete into notification values.
- DematerializeOperator - Convert notifications back into real signals.
- EnumerateOperator - Attach a running index to each emission.
- PairwiseOperator - Emit the previous and current values together.
Example
cargo run -p rx_core --example operator_map_example
let _subscription = (1..=5)
.into_observable()
.map(|i| i * 2)
.subscribe(PrintObserver::new("map_operator"));
Output:
map_operator - next: 2
map_operator - next: 4
map_operator - next: 6
map_operator - next: 8
map_operator - next: 10
map_operator - completed
map_operator - unsubscribed
operator_map_error
Transform error values into another error value.
See Also
- MapOperator - Transform each value with a mapping function.
- MapIntoOperator -
Map each value using
Into. - MapNeverOperator -
Re-type
Neversignals into concrete types. - MaterializeOperator - Turn next/error/complete into notification values.
- DematerializeOperator - Convert notifications back into real signals.
- EnumerateOperator - Attach a running index to each emission.
- PairwiseOperator - Emit the previous and current values together.
Example
cargo run -p rx_core --example operator_map_error_example
let _subscription = concat((
(1..=5)
.into_observable()
.map_error(Never::map_into::<&'static str>()),
throw("error").map(Never::map_into::<usize>()),
))
.skip(1)
.map_error(|error| format!("error? {error}"))
.subscribe(PrintObserver::new("map_error_operator"));
Output:
map_error_operator - next: 1
map_error_operator - next: 2
map_error_operator - next: 3
map_error_operator - next: 4
map_error_operator - next: 5
map_error_operator - error: "error? error"
map_error_operator - unsubscribed
operator_map_into
Map each value using Into.
See Also
- MapOperator - Transform each value with a mapping function.
- MapErrorOperator - Transform error values into another error value.
- MapNeverOperator -
Re-type
Neversignals into concrete types. - MaterializeOperator - Turn next/error/complete into notification values.
- DematerializeOperator - Convert notifications back into real signals.
- EnumerateOperator - Attach a running index to each emission.
- PairwiseOperator - Emit the previous and current values together.
Example
cargo run -p rx_core --example operator_map_into_example
#[derive(Debug)]
pub struct Foo(pub i32);
impl From<i32> for Foo {
fn from(value: i32) -> Self {
Foo(value)
}
}
let _subscription = (1..=5)
.into_observable()
.map_into()
.subscribe(PrintObserver::<Foo>::new("into_operator"));
Output:
into_operator - next: Foo(1)
into_operator - next: Foo(2)
into_operator - next: Foo(3)
into_operator - next: Foo(4)
into_operator - next: Foo(5)
into_operator - completed
into_operator - unsubscribed
operator_map_never
Re-type Never next/error channels into concrete types.
See Also
- MapOperator - Transform each value with a mapping function.
- MapIntoOperator -
Map each value using
Into. - MapErrorOperator - Transform error values into another error value.
- MaterializeOperator - Turn next/error/complete into notification values.
- DematerializeOperator - Convert notifications back into real signals.
- EnumerateOperator - Attach a running index to each emission.
- PairwiseOperator - Emit the previous and current values together.
Example
cargo run -p rx_core --example operator_map_never_example
let _subscription_error = throw("error")
.map_never()
.subscribe(PrintObserver::<i32, &'static str>::new("map_never (next)"));
let _subscription_next = just(1)
.map_never()
.subscribe(PrintObserver::<i32, &'static str>::new("map_never (error)"));
let _subscription_both = empty()
.map_never_both()
.subscribe(PrintObserver::<i32, &'static str>::new("map_never_both"));
Output:
map_never (next) - error: "error"
map_never (next) - unsubscribed
map_never (error) - next: 1
map_never (error) - completed
map_never (error) - unsubscribed
map_never_both - completed
map_never_both - unsubscribed
operator_materialize
Turn next/error/complete into notification values.
See Also
- MapOperator - Transform each value with a mapping function.
- MapIntoOperator -
Map each value using
Into. - MapErrorOperator - Transform error values into another error value.
- MapNeverOperator -
Re-type
Neversignals into concrete types. - DematerializeOperator - Convert notifications back into real signals.
- EnumerateOperator - Attach a running index to each emission.
- PairwiseOperator - Emit the previous and current values together.
Example
cargo run -p rx_core --example operator_materialize_example
let _subscription = (1..=5)
.into_observable()
.materialize()
.subscribe(PrintObserver::new("materialize_operator"));
Output:
materialize_operator - next: Next(1)
materialize_operator - next: Next(2)
materialize_operator - next: Next(3)
materialize_operator - next: Next(4)
materialize_operator - next: Next(5)
materialize_operator - next: Complete
materialize_operator - completed
materialize_operator - unsubscribed
operator_merge_all
Example
cargo run -p rx_core --example operator_merge_all_example
#[derive(Clone, Debug)]
enum Either {
Left,
Right,
}
let mut upstream_subject = PublishSubject::<Either>::default();
let mut inner_left_subject = PublishSubject::<i32>::default();
let mut inner_right_subject = PublishSubject::<i32>::default();
let l = inner_left_subject.clone();
let r = inner_right_subject.clone();
let mut _subscription = upstream_subject
.clone()
.finalize(|| println!("finalize: upstream"))
.tap_next(|n| println!("emit (source): {n:?}"))
.map(move |next| match next {
Either::Left => l.clone(),
Either::Right => r.clone(),
})
.merge_all(usize::MAX, Never::map_into())
.finalize(|| println!("finalize: downstream"))
.subscribe(PrintObserver::new("merge_map"));
upstream_subject.next(Either::Left);
inner_left_subject.next(1);
inner_right_subject.next(2);
inner_left_subject.next(3);
inner_right_subject.next(4);
upstream_subject.next(Either::Right);
inner_left_subject.next(5);
inner_right_subject.next(6);
inner_left_subject.next(7);
inner_right_subject.next(8);
inner_left_subject.complete();
inner_left_subject.next(9);
inner_right_subject.next(10);
inner_right_subject.complete();
upstream_subject.complete();
Output:
emit (source): Left
merge_map - next: 1
merge_map - next: 3
emit (source): Right
merge_map - next: 5
merge_map - next: 6
merge_map - next: 7
merge_map - next: 8
merge_map - next: 10
merge_map - completed
finalize: downstream
finalize: upstream
merge_map - unsubscribed
operator_merge_map
Map each value to an inner observable and merge their emissions concurrently.
See Also
- ConcatAllOperator - Subscribes to upstream observables one at a time in order.
- MergeAllOperator - Subscribes to upstream observables and merges their emissions concurrently.
- SwitchAllOperator - Switch to the newest inner observable, unsubscribing previous ones.
- ExhaustAllOperator - Ignore new inner observables while one is active.
- ConcatMapOperator - Map each value to an inner observable and subscribe to them one at a time in order.
- SwitchMapOperator - Map each value to an inner observable and switch to the latest, unsubscribing previous ones.
- ExhaustMapOperator - Map each value to an inner observable and ignore new ones while one is active.
Example
cargo run -p rx_core --example operator_merge_map_example
#[derive(Clone, Debug)]
enum Either {
Left,
Right,
}
let mut upstream_subject = PublishSubject::<Either>::default();
let mut inner_left_subject = PublishSubject::<i32>::default();
let mut inner_right_subject = PublishSubject::<i32>::default();
let l = inner_left_subject.clone();
let r = inner_right_subject.clone();
let mut _subscription = upstream_subject
.clone()
.finalize(|| println!("finalize: upstream"))
.tap_next(|n| println!("emit (source): {n:?}"))
.merge_map(
move |next| match next {
Either::Left => l.clone(),
Either::Right => r.clone(),
},
usize::MAX,
Never::map_into(),
)
.finalize(|| println!("finalize: downstream"))
.subscribe(PrintObserver::new("merge_map"));
upstream_subject.next(Either::Left);
inner_left_subject.next(1);
inner_right_subject.next(2);
inner_left_subject.next(3);
inner_right_subject.next(4);
upstream_subject.next(Either::Right);
inner_left_subject.next(5);
inner_right_subject.next(6);
inner_left_subject.next(7);
inner_right_subject.next(8);
inner_left_subject.complete();
inner_left_subject.next(9);
inner_right_subject.next(10);
inner_right_subject.complete();
upstream_subject.complete();
Output:
emit (source): Left
merge_map - next: 1
merge_map - next: 3
emit (source): Right
merge_map - next: 5
merge_map - next: 6
merge_map - next: 7
merge_map - next: 8
merge_map - next: 10
merge_map - completed
finalize: downstream
finalize: upstream
merge_map - unsubscribed
operator_observe_on
The observe_on operator re-emits upstream next signals on the provided
scheduler.
Upstream completion and cancellation happen immediately when there are no pending scheduled values, otherwise they are deferred until scheduled work drains.
Upstream errors are forwarded immediately; any pending scheduled values are skipped because downstream closes.
See Also
- DebounceTimeOperator - Emit the most recent value after a period of silence.
- DelayOperator - Shift emissions forward in time using the scheduler.
- SubscribeOnOperator - Schedule upstream subscription on the provided scheduler.
- ThrottleTimeOperator - Limit the frequency of downstream emissions.
Example
cargo run -p rx_core --example operator_observe_on_example
use std::time::Duration;
use rx_core::prelude::*;
use rx_core_testing::MockExecutor;
fn main() {
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();
let _subscription = (1..=3)
.into_observable()
.observe_on(scheduler)
.subscribe(PrintObserver::new("observe_on_operator"));
executor.tick(Duration::from_millis(1));
}
Output:
Ticking... (0ns)
observe_on_operator - next: 1
observe_on_operator - next: 2
observe_on_operator - next: 3
observe_on_operator - completed
observe_on_operator - unsubscribed
operator_subscribe_on
The subscribe_on operator schedules the subscription to the upstream
observable on the provided scheduler.
This only affects when the upstream subscription starts. It does not alter
when upstream next, error, or complete signals are emitted.
The subscription can be delayed with subscribe_on_with_delay.
See Also
- DebounceTimeOperator - Emit the most recent value after a period of silence.
- DelayOperator - Shift emissions forward in time using the scheduler.
- ObserveOnOperator - Re-emit upstream signals with the provided scheduler.
- ThrottleTimeOperator - Limit the frequency of downstream emissions.
- RetryOperator - Resubscribe on error up to the configured retry count.
Example
cargo run -p rx_core --example operator_subscribe_on_example
use std::time::Duration;
use rx_core::prelude::*;
use rx_core_testing::MockExecutor;
fn main() {
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();
let _subscription = (1..=3)
.into_observable()
.subscribe_on(scheduler)
.subscribe(PrintObserver::new("subscribe_on_operator"));
executor.tick(Duration::from_millis(0));
}
Output:
Ticking... (0ns)
subscribe_on_operator - next: 1
subscribe_on_operator - next: 2
subscribe_on_operator - next: 3
subscribe_on_operator - completed
subscribe_on_operator - unsubscribed
operator_throttle_time
The throttle_time operator limits the frequency of downstream emissions by
emitting an upstream value, then suppressing subsequent emissions until the
duration elapses.
When the output is set to LeadingOnly, the first upstream value in a throttle
window is emitted immediately. When the output is set to TrailingOnly, the
most recent upstream value observed during the throttle window is emitted when
it ends. The default LeadingAndTrailing setting emits both the first and the
most recent values in each throttle window.
Upstream completion and cancellation can happen instantly if there is no pending trailing value, otherwise it will complete or cancel once the trailing value has been emitted.
Upstream errors are immediately propagated downstream, cancelling any pending throttled value.
Options
Use [ThrottleTimeOptions] to configure duration and output behavior.
duration: The throttle window duration. Default:1s.output: Controls which emissions are produced in each throttle window. Default:ThrottleOutputBehavior::LeadingAndTrailing. Possible values:ThrottleOutputBehavior::LeadingOnly,ThrottleOutputBehavior::TrailingOnly,ThrottleOutputBehavior::LeadingAndTrailing.
See Also
- AdsrOperator - Convert trigger signals into an ADSR envelope driven by the scheduler.
- DebounceTimeOperator - Emit the most recent value after a period of silence.
- DelayOperator - Shift emissions forward in time using the scheduler.
- FallbackWhenSilentOperator - Emit a fallback value on ticks where the source stayed silent.
- ObserveOnOperator - Re-emit upstream signals with the provided scheduler.
- SubscribeOnOperator - Schedule upstream subscription on the provided scheduler.
Example
cargo run -p rx_core --example operator_throttle_time_example
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();
let mut subject = PublishSubject::<usize>::default();
let _subscription = interval(
IntervalObservableOptions {
duration: Duration::from_millis(1),
max_emissions_per_tick: 1000,
..Default::default()
},
scheduler.clone(),
)
.throttle_time(
ThrottleTimeOptions::new(Duration::from_millis(500)),
scheduler,
)
.subscribe(PrintObserver::new("throttle_time_operator"));
for _ in 0..10 {
executor.tick(Duration::from_millis(100));
}
Output:
Ticking... (100ms)
throttle_time_operator - next: 0
Ticking... (100ms)
Ticking... (100ms)
Ticking... (100ms)
Ticking... (100ms)
throttle_time_operator - next: 499
Ticking... (100ms)
Ticking... (100ms)
Ticking... (100ms)
Ticking... (100ms)
Ticking... (100ms)
throttle_time_operator - next: 999
throttle_time_operator - unsubscribed
operator_on_next
Invoke a callback for each value that can also decide whether to forward it.
- Returning
trueallows the value to be forwarded to the destination observer. - Returning
falseprevents the value from being forwarded.
Like
filter, but with access to the destination observer!
See Also
- TapOperator - Mirror all signals into another observer.
- TapNextOperator -
Run a callback for each
nextvalue while letting signals pass through. - OnSubscribeOperator - Run a callback when a subscription is established.
- FinalizeOperator - Execute cleanup when the observable finishes or unsubscribes.
Example
cargo run -p rx_core --example operator_on_next_example
let _subscription = (1..=5)
.into_observable()
.on_next(|next, destination| {
destination.next(next * 99);
true
})
.subscribe(PrintObserver::new("on_next_operator"));
Output:
on_next_operator - next: 99
on_next_operator - next: 1
on_next_operator - next: 198
on_next_operator - next: 2
on_next_operator - next: 297
on_next_operator - next: 3
on_next_operator - next: 396
on_next_operator - next: 4
on_next_operator - next: 495
on_next_operator - next: 5
on_next_operator - completed
on_next_operator - unsubscribed
operator_on_subscribe
Run a callback when a subscription is established.
See Also
- TapOperator - Mirror all signals into another observer.
- TapNextOperator -
Run a callback for each
nextvalue while letting signals pass through. - OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
- FinalizeOperator - Execute cleanup when the observable finishes or unsubscribes.
Example
cargo run -p rx_core --example operator_on_subscribe_example
let _subscription = (1..=5)
.into_observable()
.on_subscribe(|destination| destination.next(99))
.subscribe(PrintObserver::new("on_subscribe_operator"));
Output:
on_subscribe_operator - next: 99
on_subscribe_operator - next: 1
on_subscribe_operator - next: 2
on_subscribe_operator - next: 3
on_subscribe_operator - next: 4
on_subscribe_operator - next: 5
on_subscribe_operator - completed
on_subscribe_operator - unsubscribed
operator_pairwise
Emit the previous and current values together.
See Also
- MapOperator - Transform each value with a mapping function.
- MapIntoOperator -
Map each value using
Into. - MapErrorOperator - Transform error values into another error value.
- MapNeverOperator -
Re-type
Neversignals into concrete types. - MaterializeOperator - Turn next/error/complete into notification values.
- DematerializeOperator - Convert notifications back into real signals.
- EnumerateOperator - Attach a running index to each emission.
Example
cargo run -p rx_core --example operator_pairwise_example
let _subscription = (1..=4)
.into_observable()
.pairwise()
.subscribe(PrintObserver::new("pairwise_operator"));
Output:
pairwise_operator - next: [1, 2]
pairwise_operator - next: [2, 3]
pairwise_operator - next: [3, 4]
pairwise_operator - completed
pairwise_operator - unsubscribed
operator_reduce
Fold values and emit only the final accumulator on completion.
See Also
- ScanOperator - Accumulate state and emit every intermediate result.
Example
cargo run -p rx_core --example operator_reduce_example
let _subscription = (0..=10)
.into_observable()
.reduce(|acc, next| acc + next, 0)
.subscribe(PrintObserver::new("reduce_operator"));
Output:
reduce_operator - next: 55
reduce_operator - completed
reduce_operator - unsubscribed
operator_retry
Resubscribe on error up to the configured retry count.
See Also
- CatchOperator - On error, switch to a recovery observable.
- IntoResultOperator -
Capture next/error signals as
Resultvalues. - LiftResultOperator -
Split
Resultvalues into next and error signals. - ErrorBoundaryOperator -
Enforce
Neveras the error type to guard pipelines at compile time.
Example
Run the example with:
cargo run -p rx_core --example operator_retry_example
let mut retried = concat((
(0..=2).into_observable().map_never(),
throw("error").map_never(),
))
.retry(2);
let _s1 = retried.subscribe(PrintObserver::new("retry_operator"));
Output:
retry_operator - next: 0
retry_operator - next: 1
retry_operator - next: 2
retry_operator - next: 0
retry_operator - next: 1
retry_operator - next: 2
retry_operator - next: 0
retry_operator - next: 1
retry_operator - next: 2
retry_operator - error: "error"
retry_operator - unsubscribed
operator_scan
Accumulate state and emit every intermediate result.
See Also
- ReduceOperator - Fold values and emit only the final accumulator on completion.
Example
cargo run -p rx_core --example operator_scan_example
let _subscription = (0..=10)
.into_observable()
.scan(|acc, next| acc + next, 0)
.subscribe(PrintObserver::new("scan_operator"));
Output:
scan_operator - next: 0
scan_operator - next: 1
scan_operator - next: 3
scan_operator - next: 6
scan_operator - next: 10
scan_operator - next: 15
scan_operator - next: 21
scan_operator - next: 28
scan_operator - next: 36
scan_operator - next: 45
scan_operator - next: 55
scan_operator - completed
operator_share
Multicast a source through a connector so downstream subscribers share one upstream subscription.
See Also
- ConnectableObservable -
Maintains an internal connector subject, that can subscribe to a source
observable only when the
connectfunction is called on it. Subscribers of will subscribe to this internal connector.
Example
cargo run -p rx_core --example operator_share_example
use std::time::Duration;
use rx_core::prelude::*;
use rx_core_testing::MockExecutor;
fn main() {
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();
let shared_interval = interval(
IntervalObservableOptions {
duration: Duration::from_secs(1),
max_emissions_per_tick: 10,
..Default::default()
},
scheduler,
)
.finalize(|| println!("shared interval: unsubscribed"))
.tap_next(|n| println!("shared interval next: {n}"))
.share::<ProvideWithDefault<PublishSubject<_, _>>>(ConnectableOptions::default());
// No subscriptions yet, these will not advance the interval as there isn't one
executor.tick(Duration::from_secs(7));
let _s1 = shared_interval
.clone()
.subscribe(PrintObserver::new("share_operator_1"));
// A subscription was established, now that share is hot, there is an active interval subscription!
executor.tick(Duration::from_secs(4));
let _s2 = shared_interval
.clone()
.subscribe(PrintObserver::new("share_operator_2"));
// A subscription was already hot, the same interval output is received by the second subscription too
executor.tick(Duration::from_secs(2));
}
Ticking... (7s)
Ticking... (4s)
shared interval next: 0
share_operator_1 - next: 0
shared interval next: 1
share_operator_1 - next: 1
shared interval next: 2
share_operator_1 - next: 2
shared interval next: 3
share_operator_1 - next: 3
Ticking... (2s)
shared interval next: 4
share_operator_2 - next: 4
share_operator_1 - next: 4
shared interval next: 5
share_operator_2 - next: 5
share_operator_1 - next: 5
share_operator_2 - unsubscribed
share_operator_1 - unsubscribed
shared interval: unsubscribed
operator_skip
Drop the first n values.
See Also
- FilterOperator - Keep values that satisfy a predicate.
- FilterMapOperator -
Map values to an
Optionand keep only theSomevalues. - TakeOperator -
Emit only the first
nvalues, then complete. - LiftOptionOperator -
Filter out
Noneand forwardSomevalues.
Example
cargo run -p rx_core --example operator_skip_example
let _subscription = (1..=5)
.into_observable()
.skip(2)
.subscribe(PrintObserver::new("skip_operator"));
Output:
skip_operator - next: 3
skip_operator - next: 4
skip_operator - next: 5
skip_operator - completed
skip_operator - unsubscribed
operator_start_with
Emit a value first when subscribing to the source.
See Also
- EndWithOperator - Emit a value on completion.
Example
cargo run -p rx_core --example operator_start_with_example
let _subscription = (1..=5)
.into_observable()
.start_with(99)
.subscribe(PrintObserver::new("start_with_operator"));
Output:
start_with_operator - next: 99
start_with_operator - next: 1
start_with_operator - next: 2
start_with_operator - next: 3
start_with_operator - next: 4
start_with_operator - next: 5
start_with_operator - completed
start_with_operator - unsubscribed
operator_switch_all
Example
cargo run -p rx_core --example operator_switch_all_example
operator_switch_map
Map each value to an inner observable and switch to the latest, unsubscribing previous ones.
See Also
- ConcatAllOperator - Subscribes to upstream observables one at a time in order.
- MergeAllOperator - Subscribes to upstream observables and merges their emissions concurrently.
- SwitchAllOperator - Switch to the newest inner observable, unsubscribing previous ones.
- ExhaustAllOperator - Ignore new inner observables while one is active.
- ConcatMapOperator - Map each value to an inner observable and subscribe to them one at a time in order.
- MergeMapOperator - Map each value to an inner observable and merge their emissions concurrently.
- ExhaustMapOperator - Map each value to an inner observable and ignore new ones while one is active.
Example
cargo run -p rx_core --example operator_switch_map_operator_example
let _subscription = (1..=3)
.into_observable()
.switch_map(|next| IteratorObservable::new(1..=next), Never::map_into())
.subscribe(PrintObserver::new("switch_map"));
Output:
switch_map - next: 1
switch_map - next: 1
switch_map - next: 2
switch_map - next: 1
switch_map - next: 2
switch_map - next: 3
switch_map - completed
switch_map - unsubscribed
operator_take
Emit only the first n values, then complete.
See Also
- FilterOperator - Keep values that satisfy a predicate.
- FilterMapOperator -
Map values to an
Optionand keep only theSomevalues. - SkipOperator -
Drop the first
nvalues. - LiftOptionOperator -
Filter out
Noneand forwardSomevalues.
Example
cargo run -p rx_core --example operator_take_example
let _subscription = (1..=5)
.into_observable()
.take(2)
.subscribe(PrintObserver::new("take_operator"));
Output:
take_operator - next: 1
take_operator - next: 2
take_operator - completed
take_operator - unsubscribed
operator_tap
The tap operator lets you forward upstream values to a destination observer.
The destination could be anything, a
PrintObserver
to log upstream values to the console, or even a subject to trigger another
pipeline.
Good to know
- Keep in mind that the destination observer passed in will not get
upgraded1, meaning a tap operator will never call
unsubscribeon the destination, even if it’s a subscriber that upgrades to itself and has anunsubscribeimplementation. However, the error and complete signals are forwarded. If you want to avoid forwardingerrorandcomplete, usetap_nextinstead.
See Also
- TapNextOperator -
Run a callback for each
nextvalue while letting signals pass through. - OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
- OnSubscribeOperator - Run a callback when a subscription is established.
- FinalizeOperator - Execute cleanup when the observable finishes or unsubscribes.
Example
cargo run -p rx_core --example operator_tap_example
(1..=3)
.into_observable()
.tap(PrintObserver::new("tap_destination"))
.subscribe(PrintObserver::new("tap_operator"));
Output:
tap_destination - next: 1
tap_operator - next: 1
tap_destination - next: 2
tap_operator - next: 2
tap_destination - next: 3
tap_operator - next: 3
tap_destination - completed
tap_operator - completed
tap_operator - unsubscribed
-
Documentation on UpgradeableObserver ↩
operator_tap_next
Run a callback for each next value while letting signals pass through.
See Also
- TapOperator - Mirror all signals into another observer.
- OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
- OnSubscribeOperator - Run a callback when a subscription is established.
- FinalizeOperator - Execute cleanup when the observable finishes or unsubscribes.
Example
cargo run -p rx_core --example operator_tap_next_example
let _subscription = (1..=5)
.into_observable()
.tap_next(|next| println!("hello {next}"))
.subscribe(PrintObserver::new("tap_operator"));
Output:
hello 1
tap_operator - next: 1
hello 2
tap_operator - next: 2
hello 3
tap_operator - next: 3
hello 4
tap_operator - next: 4
hello 5
tap_operator - next: 5
tap_operator - completed
tap_operator - unsubscribed
operator_with_latest_from
Example
cargo run -p rx_core --example operator_with_latest_from_example
let mut source = PublishSubject::<usize, &'static str>::default();
let mut inner = PublishSubject::<&'static str, &'static str>::default();
let _subscription = source
.clone()
.with_latest_from(inner.clone())
.subscribe(PrintObserver::new("with_latest_from_operator"));
source.next(1);
inner.next("hello");
source.next(2);
source.next(3);
source.next(4);
inner.next("bello");
source.next(5);
inner.error("error");
Output:
with_latest_from_operator - next: (2, "hello")
with_latest_from_operator - next: (3, "hello")
with_latest_from_operator - next: (4, "hello")
with_latest_from_operator - next: (5, "bello")
with_latest_from_operator - error: "error"
with_latest_from_operator - unsubscribed
Schedulers
scheduler_ticking
Subjects
subject_async
Reduces observed values into one and emits it to active subscribers once completed. Once completed, it also replays the result to late subscribers.
See Also
- PublishSubject - Forwards observed signals to active subscribers without replaying values, but terminal state is replayed.
- BehaviorSubject - Always holds a value that is replayed to late subscribers.
- ReplaySubject -
Buffers the last
Nvalues and replays them to late subscribers. - ProvenanceSubject - BehaviorSubject that also stores an additional filtering value to track provenance.
Example
Run the example with:
cargo run -p rx_core --example subject_async_example
use rx_core::prelude::*;
fn main() {
let mut subject = AsyncSubject::<i32>::default();
let mut _subscription_1 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("async_subject sub_1"));
subject.next(1);
subject.next(2);
let mut _subscription_2 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("async_subject sub_2"));
subject.next(3);
subject.complete();
let mut _subscription_3 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("async_subject sub_3"));
}
Output:
async_subject sub_1 - next: 3
async_subject sub_2 - next: 3
async_subject sub_1 - completed
async_subject sub_1 - unsubscribed
async_subject sub_2 - completed
async_subject sub_2 - unsubscribed
async_subject sub_3 - next: 3
async_subject sub_3 - completed
async_subject sub_3 - unsubscribed
subject_behavior
Always holds a value that is replayed to late subscribers.
See Also
- PublishSubject - Forwards observed signals to active subscribers without replaying values, but terminal state is replayed.
- AsyncSubject - Reduces observed values into one and emits it on completion, replaying the result to late subscribers.
- ReplaySubject -
Buffers the last
Nvalues and replays them to late subscribers. - ProvenanceSubject - BehaviorSubject that also stores an additional filtering value to track provenance.
Example
Run the example with:
cargo run -p rx_core --example subject_behavior_example
use rx_core::prelude::*;
fn main() {
let mut subject = BehaviorSubject::<i32>::new(10);
let mut hello_subscription = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hello"));
subject.next(11);
let _s1 = subject
.clone()
.map(|next| next * 2)
.subscribe(PrintObserver::<i32>::new("hi double"));
subject.next(12);
hello_subscription.unsubscribe();
subject.next(13);
subject.complete();
let mut _completed_subscription = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hello_completed"));
}
Output:
hello - next: 10
hello - next: 11
hi double - next: 22
hi double - next: 24
hello - next: 12
hello - unsubscribed
hi double - next: 26
hi double - completed
hi double - unsubscribed
hello_completed - next: 13
hello_completed - completed
hello_completed - unsubscribed
subject_provenance
BehaviorSubject that also stores an additional filtering value to track provenance so subscribers can filter by origin.
See Also
- PublishSubject - Forwards observed signals to active subscribers without replaying values, but terminal state is replayed.
- AsyncSubject - Reduces observed values into one and emits it on completion, replaying the result to late subscribers.
- BehaviorSubject - Always holds a value that is replayed to late subscribers.
- ReplaySubject -
Buffers the last
Nvalues and replays them to late subscribers.
Example
Run the example with:
cargo run -p rx_core --example subject_provenance_example
use rx_core::prelude::*;
#[derive(PartialEq, Clone, Debug)]
enum ExampleProvenance {
Foo,
Bar,
}
fn main() {
let mut subject = ProvenanceSubject::<ExampleProvenance, usize>::new(
10,
ExampleProvenance::Foo,
);
let _all_subscription = subject
.clone()
.all()
.subscribe(PrintObserver::<usize>::new("provenance_ignored"));
let _bar_subscription = subject
.clone()
.only_by_provenance(ExampleProvenance::Bar)
.subscribe(PrintObserver::<usize>::new("provenance_bar"));
let _foo_subscription = subject
.clone()
.only_by_provenance(ExampleProvenance::Foo)
.subscribe(PrintObserver::<usize>::new("provenance_foo"));
subject.next((1, ExampleProvenance::Foo));
subject.next((2, ExampleProvenance::Bar));
subject.next((3, ExampleProvenance::Foo));
subject.next((4, ExampleProvenance::Bar));
}
Output:
provenance_ignored - next: 10
provenance_foo - next: 10
provenance_ignored - next: 1
provenance_foo - next: 1
provenance_bar - next: 2
provenance_ignored - next: 2
provenance_ignored - next: 3
provenance_foo - next: 3
provenance_bar - next: 4
provenance_ignored - next: 4
provenance_foo - unsubscribed
provenance_bar - unsubscribed
provenance_ignored - unsubscribed
subject_publish
Forwards observed signals to all active subscribers. Does not replay values to late subscribers, but always replays terminal state.
See Also
- AsyncSubject - Reduces observed values into one and emits it on completion, replaying the result to late subscribers.
- BehaviorSubject - Always holds a value that is replayed to late subscribers.
- ReplaySubject -
Buffers the last
Nvalues and replays them to late subscribers. - ProvenanceSubject - BehaviorSubject that also stores an additional filtering value to track provenance.
Example
cargo run -p rx_core --example subject_publish_example
use rx_core::prelude::*;
fn main() {
let mut subject = PublishSubject::<i32>::default();
subject.next(1);
let mut subscription = subject
.clone()
.subscribe(PrintObserver::<i32>::new("subject_example"));
subject.next(2);
subject.next(3);
subscription.unsubscribe();
subject.next(4);
subject.complete();
let _subscription_2 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("subject_example_2"));
}
Output:
subject_example - next: 2
subject_example - next: 3
subject_example - unsubscribed
subject_example_2 - completed
subject_example_2 - unsubscribed
subject_replay
Buffers the last N values and replays them to late subscribers.
See Also
- PublishSubject - Forwards observed signals to active subscribers without replaying values, but terminal state is replayed.
- AsyncSubject - Reduces observed values into one and emits it on completion, replaying the result to late subscribers.
- BehaviorSubject - Always holds a value that is replayed to late subscribers.
- ProvenanceSubject - BehaviorSubject that also stores an additional filtering value to track provenance.
Example
cargo run -p rx_core --example subject_replay_example
use rx_core::prelude::*;
fn main() {
let mut subject = ReplaySubject::<2, i32>::default();
let _s = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hello"));
subject.next(1);
subject.next(2);
subject.next(3);
let _s2 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hi"));
subject.next(4);
subject.next(5);
}
Output:
hello - next: 1
hello - next: 2
hello - next: 3
hi - next: 2
hi - next: 3
hi - next: 4
hello - next: 4
hi - next: 5
hello - next: 5
hi - unsubscribed
hello - unsubscribed
Development
Writing Tests
Integration Testing & Contract Adherence
Beyond writing tests for the main purpose of an observable or operator, you must also include tests for other standard behavior, such as making sure teardowns were forwarded and executed on unsubscribe, and that resources are disposed even when downstream unsubscribed after an action.
The more complex an observable/operator is, the easier it is to accidentally break an expected standard behavior. That is why it’s important to test for all of these requirements, even if the observable/operator itself does not need to consider them in its implementation due to its simplicity.
If you develop an observable/operator outside of this repository, you do not need to follow the standard test structure this repository is using, but it is recommended. To ensure your observable/operator behaves as expected, it’s enough to follow and test for the Runtime Contracts.
Test Organization
See also: Rust Book / Test Organization
Contract testing is integration testing, and it should be done from the same “outside” perspective as the users would use your observable/operator.
- Integration tests should be implemented in the
testsfolder of your crate. - Unit tests can be put anywhere; having them in the same file as the thing tested is preferred, as it gives you access to its private internals for assertions.
Code Coverage & Dead Code Elimination
Always start with integration testing first! This gives you an opportunity to see if your implementation has any dead code: code that doesn’t even run while still operating correctly.
Since at this point you’ve only tested your observable/operator from the outside, now you can evaluate what to do with code that was not covered by your tests depending on whether or not it’s even possible to reach it:
- If it’s possible and is related to your logic, you’ve missed testing a feature!
- If it’s possible and is not related to your logic but to standard behavior, and the contract tests didn’t cover it, then you found an edge case and a new contract should be added!
- If it’s not possible to reach it, deleting it depends on whether that piece of code has any purpose when used somewhere other than your operator/observable. If it’s a subscriber exported by your crate, someone else may write another operator with it that does make that piece of code reachable. Depending on this, you may either write a unit test for it or delete the useless code.
What 100% Test Coverage Means
In general
It’s very important to recognize that 100% test coverage does not mean
your project is completely bug free! It means that it does not have any dead
code, and that every feature is at least partially tested!
It’s still very useful, as it gives you confidence that new changes will not break existing features, and your users can be confident that at least one test exists for any feature they may end up using.
Code coverage should be thought of as an outward-facing metric for users. A confidence score of sorts. Therefore it’s not required to include code in the coverage that will never reach a user. For example, private crates that are usually dev tools are irrelevant for the user and can safely be excluded from code coverage.
Requiring it at a CI/CD level also gives you some benefits, not just in teams but for solo projects as well:
- It forces you to write tests for every feature, eliminating the possibility of forgetting to test something! (Again, it does not mean you have covered all edge cases, only that it’s at least tested!)
Integration Tests
Each test file should organize tests in modules in a
BDD fashion,
where each module describes a scenario like when_option_is_false or
when_already_unsubscribed etc.
This helps keeping actual test function names short while still being readable in the test output:
Contracts
These contracts are intended for people who write observables or operators to ensure correct operation.
This isn’t a guide on how to create your own observables and operators, but additional rules to check, after you had made one.
To learn the basics of how to write your own observables and operators, see Writing Observables and Writing Operators.
To learn how to write them in the same fashion as
rx_coreandrx_bevydoes, with separate crates for every observable/operator and an aggregator crate, seecontributing.md.
Every observable and operator must uphold these invariants to ensure the expected runtime behavior. If they aren’t met, it should be treated as a bug!
Contracts are identified by their “contract code”, always starting with
rx_contract_.
Each contract features one or more verifications identified by
“verification codes”, always starting with rx_verify_.
Tests
It is highly advised to have at least one test for each contract defined here wherever applicable!
Each contract should have its own test with the same name as the contract. The test should feature individual assertions with the verification code as part of the failure message.
rx_core_testingcontains test harnesses that can test for some of these contracts, saving time implementing the tests, ensuring every verification and extra assertion is made.
rx_contract_closed_after_error
Applies to:
- Observables
- Operators
- Subscribers
Once a subscriber emits an Error notification, it is considered “errored”,
and it should be closed, teardowns executed.
A subscriber is considered errored when it emits an error signal, not when it receives once! Some operators are designed to handle errors.
Test must verify:
rx_verify_errored: AnErrornotification was observed.rx_verify_closed:is_closedreturns true after anErrornotification was observed.rx_verify_no_new_notification_after_closed: After closing, a newnext,error,completeorunsubscribeevent must not result in a new emission.- If Observable or Operator:
rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.rx_verify_downstream_teardowns_executed: Teardowns added by afinalizedownstream of the operator should also be executed.
- If Operator:
rx_verify_upstream_teardowns_executed: Teardowns added by afinalizeupstream of the operator should also be executed.
- If there are input observables:
rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (usingfinalize) must also be executed.
- If Scheduled:
rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.
rx_contract_closed_after_complete
Applies to:
- Observables
- Operators
- Subscribers
Once an observable or a subscriber emits an Complete notification, it is
considered “completed”, and it should be closed, teardowns executed.
A subscriber is considered completed when it emits an complete signal, not when it receives once! Some operators complete later, for example:
delay.
Test must verify:
rx_verify_completed: ACompletenotification was observed.rx_verify_closed:is_closedreturns true after aCompletenotification was observed.rx_verify_no_new_notification_after_closed: After closing, a newnext,error,completeorunsubscribeevent must not result in a new emission.- If Observable or Operator:
rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.rx_verify_downstream_teardowns_executed: Teardowns added by afinalizedownstream of the operator should also be executed.
- If Operator:
rx_verify_upstream_teardowns_executed: Teardowns added by afinalizeupstream of the operator should also be executed.
- If there are input observables:
rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (usingfinalize) must also be executed.
- If Scheduled:
rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.
rx_contract_closed_after_unsubscribe
Applies to:
- Observables
- Operators
- Subscribers
An observable is not considered completed or errored when its subscription is unsubscribed. It’s a cancellation.
Test must verify:
rx_verify_unsubscribed: AnUnsubscribenotification was observed.rx_verify_closed:is_closedreturns true after anUnsubscribenotification was observed.rx_verify_no_new_notification_after_closed: After closing, a newnext,error,completeorunsubscribeevent must not result in a new emission.- If Observable or Operator:
rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.rx_verify_downstream_teardowns_executed: Teardowns added by afinalizedownstream of the operator should also be executed.
- If Operator:
rx_verify_upstream_teardowns_executed: Teardowns added by afinalizeupstream of the operator should also be executed.
- If there are input observables:
rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (usingfinalize) must also be executed.
- If Scheduled:
rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.
rx_contract_closed_if_downstream_closes_early
Applies to:
- Observables
- Operators
- Subscribers
A subscription must be closed if a downstream operator like take(1+) closes
it early.
Test must verify:
rx_verify_closed:is_closedreturns true after aUnsubscribenotification was observed.rx_verify_no_new_notification_after_closed: After closing, a newnext,error,completeorunsubscribeevent must not result in a new emission.- If Observable or Operator:
rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.rx_verify_downstream_teardowns_executed: Teardowns added by afinalizedownstream of the operator should also be executed.
- If Operator:
rx_verify_upstream_teardowns_executed: Teardowns added by afinalizeupstream of the operator should also be executed.
- If there are input observables:
rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (usingfinalize) must also be executed.
- If Scheduled:
rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.
rx_contract_closed_if_downstream_closes_immediately
Applies to:
- Observables
- Operators
- Subscribers
A subscription must be closed if a downstream operator like take(0) closes it
immediately.
Test must verify:
rx_verify_closed:is_closedreturns true after aUnsubscribenotification was observed.rx_verify_no_new_notification_after_closed: After closing, a newnext,error,completeorunsubscribeevent must not result in a new emission.- If Observable or Operator:
rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.rx_verify_downstream_teardowns_executed: Teardowns added by afinalizedownstream of the operator should also be executed.
- If Operator:
rx_verify_upstream_teardowns_executed: Teardowns added by afinalizeupstream of the operator should also be executed.
- If there are input observables:
rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (usingfinalize) must also be executed.
- If Scheduled:
rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.
rx_contract_immediate_completion
Applies to:
- Observables
- Operators (If can complete on its own)
- Subscribers (If can complete on its own)
Once known that further emissions are impossible, completion should be immediate.
For example, knowing when an iterator is finished is trivial, after the last
next, acompletemust immediately follow.But combination observables have to deduce their own completion based on the Observables they combine:
CombineLatestObservablewhen already primed, completes only when all of its inner observables have finished emitting values! Before it’s primed, it completes when any of its inner observables complete or unsubscribe, as priming then becomes impossible.ZipObservablecompletes when any of its inner observables have finished emitting values!
Test must verify:
rx_verify_immediately_completed: After the lastnextsignal, acompletesignal should be observed immediately.
rx_contract_do_not_complete_until_necessary
Applies to:
- Combination Observables
Combination observables should not complete until it becomes impossible to emit further values.
Test must verify:
rx_verify_not_closed: If a single input observable unsubscribed, but another one can still trigger emissions, the observable itself should not complete yet.
Additional Guidelines
These are additional guidelines to better adhere to the contracts. Some of them are indirectly verified by contracts, some of them are not testable. Either way these do not need their own tests, and as such, can’t be considered contracts by themselves. More like reminders, or suggestions.
Operator Subscribers must always forward all upstream signal downstream unless altering it is their expected behavior
Downstream operators depend on signals too, don’t forget to forward them!
The
mapoperator’s only job is to turn the upstreamnextsignal into their mapped value and forward it downstream. It does not alter the behavior oferror,completeandunsubscribe, so it must call the same function on its destination.
For example:
fn complete(&mut self) {
self.destination.complete(context);
}
No unnecessary .is_closed() checks
Only newly produced signals should check if the the destination is still open!
For an observable, this does mean every individual signal, as they originate from there.
For example map only transforms values. Upstream won’t ever send anything
after it’s closed. map only interacts with downstream once each time upstream
interacts with it, and returns downstream’s is_closed state, therefore in case
downstream closes early, an upstream producer shouldn’t even try interacting
with it anyway.
The only exeptions are Subjects, where the Observer functions are exposed to the user.
fn next(&mut self, next: Self::In) {
if !self.destination.is_closed() { // Unnecessary
self.destination.next((self.mapper)(next));
}
}
This only applies to the first synchronous interaction with downstream as any interaction with downstream can potentially cause it to be closed:
Incorrect:
fn next(&mut self, next: Self::In) {
self.destination.next(next.clone()); // Still not necessary to check!
self.destination.next(next); // Should be checked if not closed!
}
Correct:
fn next(&mut self, next: Self::In) { // Wouldn't even be called if it's closed!
self.destination.next(next.clone());
if self.is_closed() { // The first next could cause downstream to close!
self.destination.next(next);
}
}
Then it would be considered a producer, and the second downstream next call
should be checked!
An loop for example should break if further iterations are unnecessary.
for item in self.iterator.clone().into_iter() {
if destination.is_closed() {
break;
}
destination.next(item);
}
As a rule of thumb, if a subscribers
is_closedimplementation already respects the “closedness” of downstream, for the very first interaction with it, it does not need to check if downstream is closed, as upstream already did.
What if I don’t?
If you do make extra checks, the penalty is just an extra if.
If you do not check if the destination is closed before sending a new signal, then any work done by downstream operators is also unnecessary.
Neither of these problems are “lethal”, this is about optimization.
Use Never as your signal type if that signal is never sent
The rx_core_common crate exposes the Never type which can’t be
constructed since it’s an enum with no variants.
Never is actually just a type alias for
core::convert::Infallible. The reasonInfallibleisn’t used directly, because that name conveys that it’s an error, while here it could mean any event/signal that can never happen. And that event can be a valid output too, not just an error.
This type MUST be used to denote signals that are never produced instead of
using the unit type () which could be produced, and as such is inadequate to
denote that something won’t ever produce said signal.
- If an Observable never produces an error, it must set its
OutErrortype toNever. - If an Observable never produces a value, its
Outtype must be set toNever.- For example the
ThrowObservableonly produces a single error, therefore itsOuttype isNever - And the
NeverObservablenever produces anything so bothOutandOutErrorisNever.
- For example the
- If a Subscriber never sends errors downstream (for example it catches
errors), it also must set its
OutErrortype toNever. - If a Subscriber never sends values downstream (for example it re-throws them
as errors), it also must set its
Outtype toNever.
Note that in the future once Rust stabilizes the actual never type (
!), theNevertype inrx_core_commonwill be deprecated in favor of it.Tracking issue: https://github.com/AlexAegis/rx_bevy/issues/27
