Concepts & Terminology
Before diving into the individual observables and operators, let’s go through all the concepts and nomenclature you might encounter, and their definitions.
Pretty much everything within the repository assumes that you already know what each of these mean and do to some degree.
Concept Hierarchy
Observer (Destination)
The simplest concept and the one that needs an immediate clarification is the
observer as this - in the context of rx_bevy - is not the same thing as
Bevy observers!
An RxObserver is something that implements three functions for its 3 observer “channels” via the RxObserver trait:
nexterrorcomplete
Channels & Signals
Functions on the Observer trait can be thought of as channels, carriers of signals, each with a different purpose and behavior:
-
The
nextchannel carries the value signal, the useful content you want to observe. -
The
errorchannel carries errors separately, to enable error handling independently of the values.If you’re curious about why errors are on a separate channel instead of just using
Results, see the “Why errors have their own channel?” section. -
The
completechannel carries the completion signal. It signals that no morenextorerroremissions will come anymore. This signal does not carry any tangible values. And is usually sent right after the lastnextsignal.
Emission
The act of emitting a signal.
Inputs
Observers are things that can receive values, therefore it defines its
input types using the ObserverInput trait. These types define the values
that are received by the next and error functions.
pub trait ObserverInput {
type In: Signal;
type InError: Signal;
}
Notifications
In some places you may encounter signals referred to as notifications. The distinction is that notifications are the materialized form of signals.
This is useful whenever you want to materialize all the different kinds of signals of something into one value, whichever signal that may have been. For example when sending them as an event, or serializing them.
This could be an enum like the ObserverNotification
pub enum ObserverNotification<In, InError>
where
In: Signal,
InError: Signal,
{
Next(In),
Error(InError),
Complete,
}
Or as an event used in Bevy like the SubscriberNotificationEvent
Observable
You may already think of Observables as things that emit signals, but that’s not actually (strictly) true!
Observables are things that you can subscribe to with an observer as the destination of the resulting subscription! This resulting subscription is the source of signals!
Therefore, an observable is more like a piece of configuration based on which actual subscriptions can be created.
Some observables emit their values immediately and they only return an already closed “inert” subscription. For them, technically speaking, it was the observer that emitted those signals. For example the
ofand theiteratorobservables both complete immediately on subscription.
This may seem like a superficial distinction to make as it still is the observable that you directly interact with, but it is important to understand how they work.
If we know that the state is always part of a subscription and not the observable, it’s clear that you can subscribe to the same observable multiple times, and all subscriptions are going to be unique, independent “instances”, with their own state!
Outputs
Observables define the types of what their subscriptions may emit, what errors (if any) they may produce:
pub trait ObservableOutput {
type Out: Signal;
type OutError: Signal;
}
Observables that do not emit errors (or values) use the
Nevertype. SinceNeveris an empty enum, it is impossible to create a value of it! This ensures that if something says it won’t error, then it really can’t. (TheNevertype is actually an alias to theInfallibletype used with theResulttype!)
Example: Subscribing to an Observable
This example demonstrates a subscription of an observable using the
PrintObserver as the destination. Each value will be emitted immediately, but
one by one, and then complete.
let iterator_observable = IteratorObservable::new(1..=4);
let subscription = iterator_observable
.subscribe(PrintObserver::new("iterator_observable"));
Output:
iterator_observable - next: 1
iterator_observable - next: 2
iterator_observable - next: 3
iterator_observable - next: 4
iterator_observable - completed
iterator_observable - unsubscribed
Subscription
From the observables perspective, a subscription is an “instance” of an
observable. The most important function on it is the unsubscribe function,
which will stop the subscription, closing it.
From the subscriptions own perspective, it’s a value that represents owned
resources (state, teardown functions) that you can release by calling
unsubscribe.
The
addandadd_teardownmethods on subscriptions let you add additional things into the subscription that will also be unsubscribed when the subscription unsubscribes. These can be other subscriptions, or just simple callbacks, aka teardowns.
A very important thing to learn here that everything else, observables, operators, observers, are all there just to create subscriptions. This is where state resides!
Teardown
A teardown function is an FnOnce that can be part of a subscription and will
be called on unsubscribe.
Combination Observables
Some observables are there to combine other observables into one. As each input observable can emit as many signals as they want, at their own pace, or maybe even never, there are many ways to combine two observables.
Some examples are:
-
MergeObservable: A tuple of observables of common output types, that emit their values concurrently into a common stream of events. -
ConcatObservable: A tuple of observables of common output types, that subscribes to one observable at a time, waits until it completes and then subscribes to the next one, in order. (Has the exact same behavior as a MergeObservable with aconcurrency_limitof1!) -
CombineLatestObservable: Two observables emit into a tuple of each observable output type ((O1::Out, O2::Out)) when any of them emit, but only after each had at least one emission, aka primed. -
ZipObservable: Two observables emit into a tuple of each observables output type ((O1::Out, O2::Out)) when, for each emission, there is one from the other observable. The first emission ofO1will always be paired with the first emission ofO2, the second emissions will be emitted together and so on.This can lead to the excessive build up of events when one is emitting fast and the other one is slow. The buffering behavior can be controlled by its options value.
Currently only 2 observables can be combined by each combinator. If you want more, just nest more of them together. (Or help implement varargs.)
Primed
Some observables do not emit anything until they are primed. For every subscription, this can happen at most once, and remains true for the remainder of it’s duration.
For example, some combination observables like combine_latest and zip
emit values taken from all of their input observables, so it’s impossible
for them to emit anything until this condition is met. Once it is met, the
subscription to them can be considered primed, and expected to emit values.
Where priming matters is completion. If an upstream completion prevents priming, downstream should immediately complete too. Once primed, the condition to complete will depend on the observable.
Operators
Operators take an observable as input and return a new observable as output, enhancing the original observable with new behavior.
Composable Operators
Composable Operators are a subset of regular Operators. Unlike - for
example - the retry operator, that (as the name suggests) retries
subscription to the source, many other operators do not interact with their
source observable beyond just subscribing to them once.
All composable operators do is either, or both:
-
Wrap the destination into a subscriber on subscribe
-
Interact with the destination on subscribe
The
start_withandfinalizeoperators don’t create anything new on subscribe, they only interact with the destination subscriber.
But they don’t know anything about who the source observable is.
Why though?
This enables 2 things:
-
Simpler implementation for a lot of operators!
By skipping implementing the actual operator that stores the source observable it wraps. This layer is auto implemented using the
Pipeoperator/observable whose sole job is to combine a source observable and a composable operator. -
Enables composite operators (behind the
composefeature)!Composite operators are (composable) operators made from other composable operators!
let my_operator = compose_operator::<i32, Never>() .map(|i| i * 2) .filter(|i| i < &4);Composite Operators are a convenient way of creating new operator without actually having to implement one from scratch. The obvious limitation here is that it can only use the composable subset of operators. So no
retry, noshare.
Subscribers
A subscriber is something that’s both an observer and a subscription at the same time!
Most of the time, they wrap another observer or subscriber, which means you can have a deeply nested series of subscribers, in which the deepest element is usually a regular observer, the true destination. And this whole nested structure lives in a subscription!
A single subscriber usually implements a single, easily understandable behavior, that it applies by reacting to upstream emissions, and producing different downstream emissions.
Downstream & Upstream from the Subscribers Perspective
In the context of observables and operators, downstream refers to the
destination, where signals are sent, and upstream refers to the source,
the caller of the next, error and complete functions.
For example, looking at the map operators next implementation:
fn next(
&mut self,
next: Self::In, // This is coming from upstream
) {
let mapped = (self.mapper)(next);
self.destination.next(mapped); // And this is sending it downstream
}
Downstream & Upstream from the Operators Perspective
If we zoom out where this operator is used:
let _s = (1..=5)
.into_observable() // Relative to the `map` operator, this `IteratorObservable` is upstream
.map(|i| i * 2)
.skip(1) // And this `skip` operator is downstream
.subscribe(PrintObserver::new("map_operator")); // The `PrintObserver` is also downstream relative to `map`.
UpgradeableObserver
When subscribing to an Observable, sometimes we want the observable to be able to send an unsubscribe call to this destination, and sometimes it should be detached.
Remember: A subscriber is both an observer and a subscription
Regular Subscribers implement it by returning themselves as
there is nothing to upgrade to become a subscriber. Observers do not have a
SubscriptionLike impl therefore they need to pick another subscriber to wrap
themselves in, when used as a destination in a subscribe call. Which is
usually the detached implementation for observers.
Detached Subscriber
A subscriber is detached if it completely avoids sending unsubscribe, or in
some cases even complete signals.
Detached subscribers can’t unsubscribe downstream, serving as a hard boundary for unsubscription.
Why do errors have their own channel?
Since each operator and subscriber implements and does only one thing, dealing
with erroneous values in every operator would be very tedious. Imagine that
when you have an observable that emits Results because it’s fallible, your
mappers would need to do an inner map:
fallible_observable
.map(|i_result| i_result.map(|i| * 2))
.subscribe(...);
In case you do want to move errors between the
errorandnextchannels, you can use theinto_resultoperator to combine all upstreamnextanderrorsignals into onlynextsignals downstream, changing the downstream error type toNever.And using the
lift_erroroperator, you can unpack upstreamResultvalues into downstreamnextanderrorsignals. (In this case, you actually have 2 separate error types, the upstreamerrorsignal, and the upstreamnextresults error type. This is why you need to supply an error mapping function into this operator.)
Other Operators
More detailed information on individual operators and their behavior can be seen in their documentation page here in the book, or their package readme (which are the same documents).
The most important information on them are also available on the operators and (primarily) the extension functions themselves too for easy access during development!
Subjects
A subject is something that is both an observable and an observer at the same time!
This makes subjects capable to input data into subscriptions from “outside” of it!
Run this example:
cargo run --package rx_core_subject_publish --example subject_example
let mut subject = PublishSubject::<i32>::default();
subject.next(1); // Meteora - Track 11
let mut subscription = subject
.clone()
.subscribe(PrintObserver::<i32>::new("subject_example"));
subject.next(2);
subject.next(3);
subscription.unsubscribe();
subject.next(4);
Output:
subject_example - next: 2
subject_example - next: 3
finalize
subject_example - unsubscribed
We can clearly see that only those values were observed that were emitted during when the subscription was active!
Multicasting
As with any observable, a subject can be subscribed to multiple times! This means subjects are fundamentally multicasting!
Whenever you put a value inside it, all of their subscribers will receive it.
Once unsubscribed, no new values can be emitted by the subject. New subscriptions attempted on the subject will be immediately unsubscribed.
Example:
Run this example:
cargo run --package rx_core_subject_publish --example subject_multicasting_example
let mut subject = PublishSubject::<i32>::default();
subject.next(1);
let mut subscription_1 = subject
.clone()
.finalize(|| println!("finalize subscription 1"))
.subscribe(PrintObserver::<i32>::new("subject_subscription_1"));
subject.next(2);
let _subscription_2 = subject
.clone()
.finalize(|| println!("finalize subscription 2"))
.subscribe(PrintObserver::<i32>::new("subject_subscription_2"));
subject.next(3);
subscription_1.unsubscribe();
subject.next(4);
Output:
subject_subscription_1 - next: 2
subject_subscription_1 - next: 3
subject_subscription_2 - next: 3
finalize subscription 1
subject_subscription_1 - unsubscribed
subject_subscription_2 - next: 4
finalize subscription 2
subject_subscription_2 - unsubscribed
You can see that the signal 3 was heard by both subscriptions! And each
subscription had its own finalize callback! Each individual subscription is
unique and can have as many or little operators on it as you want!
PublishSubject
The vanilla subject, it multicasts incoming values to currently active subscribers.
Only completion and error signals are replayed to new subscribers if the subject was finished. If the subject was also unsubscribed, the new subscription too will be immediately unsubscribed.
As all other subjects are just wrappers around PublishSubject, this behavior is shared across all of them.
BehaviorSubject
A BehaviorSubject is a subject that always has exactly 1 value of it’s input type stored, therefore to create a new BehaviorSubject, you must provide an initial value.
Immediately when subscribed to, this initial value will be emitted!
This makes the BehaviorSubject ideal to be used as a reactive storage. A value that can change over time where subscribers are always reacting to the latest value without having to wait for it!
BehaviorSubjects continue to replay even after unsubscribed, but they can’t receive new values and the new subscription will be immediately unsubscribed. They do not replay however when they errored!
Not every type of value can have a sensible default, or even if they do, sometimes it doesn’t make sense to use it in the context! In that case, use a ReplaySubject<1, _>!
Example:
Run this example:
cargo run --package rx_core_subject_behavior --example behavior_subject_example
let mut subject = BehaviorSubject::<i32>::new(10);
// Immediately prints "hello 10"
let mut hello_subscription = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hello"));
subject.next(11);
let _s1 = subject
.clone()
.map(|next| next * 2)
.subscribe(PrintObserver::<i32>::new("hi double"));
subject.next(12);
hello_subscription.unsubscribe();
subject.next(13);
subject.complete();
let mut _compelted_subscription = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hello_completed"));
Output:
hello - next: 10
hello - next: 11
hi double - next: 22
hello - next: 12
hi double - next: 24
hello - unsubscribed
hi double - next: 26
hi double - completed
hi double - unsubscribed
hello_completed - next: 13
hello_completed - completed
hello_completed - unsubscribed
ReplaySubject
A ReplaySubject is a subject that buffers the last N emissions, and when
subscribed to immediately replays all of them!
Unlike BehaviorSubject, it does not guarantee that a value is always present, because it does not require you to define some values to create it.
But like BehaviorSubjects, ReplaySubjects continue to replay even after unsubscribed, but they can’t receive new values and the new subscription will be immediately unsubscribed.
You can still next some values immediately into it if you want!
This makes the ReplaySubject ideal to cache something that does not have a sensible default, initial value!
For situation: You’re waiting for a height measurement, which is a number, and
numbers have a default value of 0. Some pipelines downstream take this
measurement and calculate some things for you. It does not make sense to run
that computation with the value 0 as it’s not an actual measurement, just a
default. For this situation you can have either a ReplaySubject<1, f32> or a
BehaviorSubject<Option<f32>>(None). Sometimes you want stuff to start
immediately, even if there is no actual value. Or want this thing to return to
an initial, “uninitialized” state.
Example:
Run this example:
cargo run --package rx_core_subject_replay --example replay_subject_example
let mut subject = ReplaySubject::<2, i32>::default();
// Doesn't print out anything on subscribe
let _s = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hello"));
subject.next(1);
subject.next(2);
subject.next(3);
// Only the last two value is printed out, since our capacity is just 2
let _s2 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("hi"));
subject.next(4);
subject.next(5);
Output:
hello - next: 1
hello - next: 2
hello - next: 3
hi - next: 2
hi - next: 3
hello - next: 4
hi - next: 4
hello - next: 5
hi - next: 5
hi - unsubscribed
hello - unsubscribed
When the second subscription subscribed, the buffer contained [2, 3] and was
immediately received by the new subscription!
AsyncSubject
The AsyncSubject will only emit once it completes.
Late subscribers who subscribe after it had already completed will also receive the last result, followed immediately with a completion signal.
What it will emit on completion depends on the reducer function used.
By default, it just replaces the result with the most recent observed
value next-ed into the subject.
But you can also specify your own reducer to accumulate all observed
values to be the result on completion.
Example:
Run this example:
cargo run --package rx_core_subject_async --example async_subject_example
let mut subject = AsyncSubject::<i32>::default();
let mut _subscription_1 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("async_subject sub_1"));
subject.next(1);
subject.next(2);
let mut _subscription_2 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("async_subject sub_2"));
subject.next(3);
subject.complete();
let mut _subscription_3 = subject
.clone()
.subscribe(PrintObserver::<i32>::new("async_subject sub_3"));
Output:
async_subject sub_1 - next: 3
async_subject sub_2 - next: 3
async_subject sub_1 - completed
async_subject sub_2 - completed
async_subject sub_1 - unsubscribed
async_subject sub_2 - unsubscribed
async_subject sub_3 - next: 3
async_subject sub_3 - completed
async_subject sub_3 - unsubscribed
Scheduling
Every example so far was “immediate”, they either emitted all their values immediately, or - in the case of subjects - when we pushed values into them.
But what really makes observables useful is when they can react to things and emit a signal when they do! And for that, a subscription or a subscriber needs to be able to emit signals even without upstream signals triggering it’s own logic.
This requires something that runs in the “background” to drive the work issued by subscribers or subscriptions.
For example: “Do a
nextcall on this, 2 seconds from now!” or “Callnexton this every 200ms from now, until I say stop!”
Scheduler
The scheduler is a shared queue that subscribers have access to (always passed in by the user) to issue work to be executed.
Executor
The executor is the thing responsible to drive the work delegated to the scheduler, collected by the schedulers queue. It owns the scheduler, for which handles can be acquired from the executor.
Work
There are multiple types of work, depending on how they are handled with respect to time. So that reimplementing basic time based logic - like a delay - is not required by the issuer of the work.
Work can be issued, cancelled, or invoked from the scheduler queue.
Immediate Work
The simplest type of work, it executes as soon as the executor receives it.
Delayed Work
Delayed work will be executed only after its specified delay had passed.
Repeated Work
Repeated work re-executes its work each time it repeats, after a specified time interval.
Continuous Work
Continouos Work is like repeated work but without the time interval, they simply execute as many times as often as they can.
It depends on the executor to define the actual frequency this type of work is running at.
- With the tickable executor, this means on every
tickcall.- In Bevy, this means once every frame.
- In an async executor, this would be set to a reasonably fast interval, like a target FPS.
Invoked Work
Invoked work is not executed automatically, but based on its invoke_id
can be “invoked” which means executing it as soon as the executor can.
For a ticking executor this means the next tick after invocation
Scheduler Context
Executors define a context, passed in as a mutable reference to the
whenever they are executed. The main job of the context is to provide the
current time (as a Duration, denoting the time passed since startup).
For example: Interacting with the Bevy ECS world.
The context is defined by the executor, but subscribers can be written for specific contexts too. Resulting in context specific subscribers with extra capabilities relevant only in that context, compatible only with that executor.
Scheduler Work Input
Most generic scheduled subscribers do not need to know about anything besides the time coming from the context. Still, some executors can provide extra data relevant to the execution of the work at that moment.
For example: In the TickingExecutor, the
Tickobject is passed into every executed work.
TickingExecutor
The base scheduler used both for tests, and for Bevy.
It can be manually advanced by calling tick.
Time can only move forwards in the executor!
Interval Example
let mut mock_executor = MockExecutor::new_with_logging();
let scheduler = mock_executor.get_scheduler_handle();
let mut interval_observable = IntervalObservable::new(
IntervalObservableOptions {
duration: Duration::from_secs(1),
max_emissions_per_tick: 3,
start_on_subscribe: true,
},
scheduler,
);
let _subscription = interval_observable.subscribe(PrintObserver::new("interval_observable"));
mock_executor.tick(Duration::from_millis(600));
mock_executor.tick(Duration::from_millis(401));
mock_executor.tick(Duration::from_millis(16200)); // lag spike! would result in 16 emissions, but the limit is 2!
mock_executor.tick(Duration::from_millis(1200));
mock_executor.tick(Duration::from_millis(2200));
Output:
interval_observable - next: 0
Ticking... (600ms)
Ticking... (401ms)
interval_observable - next: 1
Ticking... (16.2s)
interval_observable - next: 2
interval_observable - next: 3
interval_observable - next: 4
Ticking... (1.2s)
interval_observable - next: 5
Ticking... (2.2s)
interval_observable - next: 6
interval_observable - next: 7
interval_observable - unsubscribed