Contracts
These contracts are intended for people who write observables or operators to ensure correct operation.
This isn’t a guide on how to create your own observables and operators, but additional rules to check, after you had made one.
To learn the basics of how to write your own observables and operators, see Writing Observables and Writing Operators.
To learn how to write them in the same fashion as
rx_coreandrx_bevydoes, with separate crates for every observable/operator and an aggregator crate, seecontributing.md.
Every observable and operator must uphold these invariants to ensure the expected runtime behavior. If they aren’t met, it should be treated as a bug!
Contracts are identified by their “contract code”, always starting with
rx_contract_.
Each contract features one or more verifications identified by
“verification codes”, always starting with rx_verify_.
Tests
It is highly advised to have at least one test for each contract defined here wherever applicable!
Each contract should have its own test with the same name as the contract. The test should feature individual assertions with the verification code as part of the failure message.
rx_core_testingcontains test harnesses that can test for some of these contracts, saving time implementing the tests, ensuring every verification and extra assertion is made.
rx_contract_closed_after_error
Applies to:
- Observables
- Operators
- Subscribers
Once a subscriber emits an Error notification, it is considered “errored”,
and it should be closed, teardowns executed.
A subscriber is considered errored when it emits an error signal, not when it receives once! Some operators are designed to handle errors.
Test must verify:
rx_verify_errored: AnErrornotification was observed.rx_verify_closed:is_closedreturns true after anErrornotification was observed.rx_verify_no_new_notification_after_closed: After closing, a newnext,error,completeorunsubscribeevent must not result in a new emission.- If Observable or Operator:
rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.rx_verify_downstream_teardowns_executed: Teardowns added by afinalizedownstream of the operator should also be executed.
- If Operator:
rx_verify_upstream_teardowns_executed: Teardowns added by afinalizeupstream of the operator should also be executed.
- If there are input observables:
rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (usingfinalize) must also be executed.
- If Scheduled:
rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.
rx_contract_closed_after_complete
Applies to:
- Observables
- Operators
- Subscribers
Once an observable or a subscriber emits an Complete notification, it is
considered “completed”, and it should be closed, teardowns executed.
A subscriber is considered completed when it emits an complete signal, not when it receives once! Some operators complete later, for example:
delay.
Test must verify:
rx_verify_completed: ACompletenotification was observed.rx_verify_closed:is_closedreturns true after aCompletenotification was observed.rx_verify_no_new_notification_after_closed: After closing, a newnext,error,completeorunsubscribeevent must not result in a new emission.- If Observable or Operator:
rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.rx_verify_downstream_teardowns_executed: Teardowns added by afinalizedownstream of the operator should also be executed.
- If Operator:
rx_verify_upstream_teardowns_executed: Teardowns added by afinalizeupstream of the operator should also be executed.
- If there are input observables:
rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (usingfinalize) must also be executed.
- If Scheduled:
rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.
rx_contract_closed_after_unsubscribe
Applies to:
- Observables
- Operators
- Subscribers
An observable is not considered completed or errored when its subscription is unsubscribed. It’s a cancellation.
Test must verify:
rx_verify_unsubscribed: AnUnsubscribenotification was observed.rx_verify_closed:is_closedreturns true after anUnsubscribenotification was observed.rx_verify_no_new_notification_after_closed: After closing, a newnext,error,completeorunsubscribeevent must not result in a new emission.- If Observable or Operator:
rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.rx_verify_downstream_teardowns_executed: Teardowns added by afinalizedownstream of the operator should also be executed.
- If Operator:
rx_verify_upstream_teardowns_executed: Teardowns added by afinalizeupstream of the operator should also be executed.
- If there are input observables:
rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (usingfinalize) must also be executed.
- If Scheduled:
rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.
rx_contract_closed_if_downstream_closes_early
Applies to:
- Observables
- Operators
- Subscribers
A subscription must be closed if a downstream operator like take(1+) closes
it early.
Test must verify:
rx_verify_closed:is_closedreturns true after aUnsubscribenotification was observed.rx_verify_no_new_notification_after_closed: After closing, a newnext,error,completeorunsubscribeevent must not result in a new emission.- If Observable or Operator:
rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.rx_verify_downstream_teardowns_executed: Teardowns added by afinalizedownstream of the operator should also be executed.
- If Operator:
rx_verify_upstream_teardowns_executed: Teardowns added by afinalizeupstream of the operator should also be executed.
- If there are input observables:
rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (usingfinalize) must also be executed.
- If Scheduled:
rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.
rx_contract_closed_if_downstream_closes_immediately
Applies to:
- Observables
- Operators
- Subscribers
A subscription must be closed if a downstream operator like take(0) closes it
immediately.
Test must verify:
rx_verify_closed:is_closedreturns true after aUnsubscribenotification was observed.rx_verify_no_new_notification_after_closed: After closing, a newnext,error,completeorunsubscribeevent must not result in a new emission.- If Observable or Operator:
rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.rx_verify_downstream_teardowns_executed: Teardowns added by afinalizedownstream of the operator should also be executed.
- If Operator:
rx_verify_upstream_teardowns_executed: Teardowns added by afinalizeupstream of the operator should also be executed.
- If there are input observables:
rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (usingfinalize) must also be executed.
- If Scheduled:
rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.
rx_contract_immediate_completion
Applies to:
- Observables
- Operators (If can complete on its own)
- Subscribers (If can complete on its own)
Once known that further emissions are impossible, completion should be immediate.
For example, knowing when an iterator is finished is trivial, after the last
next, acompletemust immediately follow.But combination observables have to deduce their own completion based on the Observables they combine:
CombineLatestObservablewhen already primed, completes only when all of its inner observables have finished emitting values! Before it’s primed, it completes when any of its inner observables complete or unsubscribe, as priming then becomes impossible.ZipObservablecompletes when any of its inner observables have finished emitting values!
Test must verify:
rx_verify_immediately_completed: After the lastnextsignal, acompletesignal should be observed immediately.
rx_contract_do_not_complete_until_necessary
Applies to:
- Combination Observables
Combination observables should not complete until it becomes impossible to emit further values.
Test must verify:
rx_verify_not_closed: If a single input observable unsubscribed, but another one can still trigger emissions, the observable itself should not complete yet.
Additional Guidelines
These are additional guidelines to better adhere to the contracts. Some of them are indirectly verified by contracts, some of them are not testable. Either way these do not need their own tests, and as such, can’t be considered contracts by themselves. More like reminders, or suggestions.
Operator Subscribers must always forward all upstream signal downstream unless altering it is their expected behavior
Downstream operators depend on signals too, don’t forget to forward them!
The
mapoperator’s only job is to turn the upstreamnextsignal into their mapped value and forward it downstream. It does not alter the behavior oferror,completeandunsubscribe, so it must call the same function on its destination.
For example:
fn complete(&mut self) {
self.destination.complete(context);
}
No unnecessary .is_closed() checks
Only newly produced signals should check if the the destination is still open!
For an observable, this does mean every individual signal, as they originate from there.
For example map only transforms values. Upstream won’t ever send anything
after it’s closed. map only interacts with downstream once each time upstream
interacts with it, and returns downstream’s is_closed state, therefore in case
downstream closes early, an upstream producer shouldn’t even try interacting
with it anyway.
The only exeptions are Subjects, where the Observer functions are exposed to the user.
fn next(&mut self, next: Self::In) {
if !self.destination.is_closed() { // Unnecessary
self.destination.next((self.mapper)(next));
}
}
This only applies to the first synchronous interaction with downstream as any interaction with downstream can potentially cause it to be closed:
Incorrect:
fn next(&mut self, next: Self::In) {
self.destination.next(next.clone()); // Still not necessary to check!
self.destination.next(next); // Should be checked if not closed!
}
Correct:
fn next(&mut self, next: Self::In) { // Wouldn't even be called if it's closed!
self.destination.next(next.clone());
if self.is_closed() { // The first next could cause downstream to close!
self.destination.next(next);
}
}
Then it would be considered a producer, and the second downstream next call
should be checked!
An loop for example should break if further iterations are unnecessary.
for item in self.iterator.clone().into_iter() {
if destination.is_closed() {
break;
}
destination.next(item);
}
As a rule of thumb, if a subscribers
is_closedimplementation already respects the “closedness” of downstream, for the very first interaction with it, it does not need to check if downstream is closed, as upstream already did.
What if I don’t?
If you do make extra checks, the penalty is just an extra if.
If you do not check if the destination is closed before sending a new signal, then any work done by downstream operators is also unnecessary.
Neither of these problems are “lethal”, this is about optimization.
Use Never as your signal type if that signal is never sent
The rx_core_common crate exposes the Never type which can’t be
constructed since it’s an enum with no variants.
Never is actually just a type alias for
core::convert::Infallible. The reasonInfallibleisn’t used directly, because that name conveys that it’s an error, while here it could mean any event/signal that can never happen. And that event can be a valid output too, not just an error.
This type MUST be used to denote signals that are never produced instead of
using the unit type () which could be produced, and as such is inadequate to
denote that something won’t ever produce said signal.
- If an Observable never produces an error, it must set its
OutErrortype toNever. - If an Observable never produces a value, its
Outtype must be set toNever.- For example the
ThrowObservableonly produces a single error, therefore itsOuttype isNever - And the
NeverObservablenever produces anything so bothOutandOutErrorisNever.
- For example the
- If a Subscriber never sends errors downstream (for example it catches
errors), it also must set its
OutErrortype toNever. - If a Subscriber never sends values downstream (for example it re-throws them
as errors), it also must set its
Outtype toNever.
Note that in the future once Rust stabilizes the actual never type (
!), theNevertype inrx_core_commonwill be deprecated in favor of it.Tracking issue: https://github.com/AlexAegis/rx_bevy/issues/27