Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

rx_bevy

Introduction

Reactive Extensions for Bevy!

rx_bevy brings observables and composable operators to make event orchestration a breeze!

Use Observables to serve as event sources, and subscribe to them to spawn active subscriptions that send events to Observers!

Use combination observables to combine other event sources, and use operators to transform these events and their behavior!

rx_core / rx_bevy

While this project was built with the purpose of bringing reactive extensions to Bevy specifically, its core was designed to be framework agnostic. And as such all generic observables, operators, traits are shipped from the rx_core crate and are prefixed with rx_core. rx_bevy re-exports the entirety of rx_core and some more, Bevy specific observables and operators with the context implementation necessary to integrate with bevy_ecs.

This does not mean that rx_bevy is not a Bevy first project, as it suffers no downsides from keeping its core framework agnostic. In fact, many core trait design decisions were made to facilitate easy integration with ECS.

Book Contents

To learn the core concepts used throughout this project like what an Observable is, take a look at the Concepts page.

Observables, Operators and Subjects

Learn more about individual low level component from their readme file, accessible right here from the book.

Every construct implemented follows the traditional Rx names, and behavior. So if you’ve used an Rx library before, even in another language, your knowledge applies here too.

rx_bevy specifics

Once you know the core concepts and what each component offers, learn how to use them within Bevy at the Usage Within Bevy page.

Examples

Toggle a timer’s speed by keyboard

In this example, the KeyboardObservables subscription emits just_pressed KeyCodes, and the filter operator limits them to 4. Then switch_map creates an internal subscription to an IntervalObservable whose speed depends on the KeyCode observed. The scan operator ignores the interval’s emissions (they restart on every new KeyCode) and counts the number of emissions. The result is a counter whose speed changes based on the key pressed.

Try this example in the observable gallery! Press L to subscribe/unsubscribe!

cargo run --example observable_gallery --features example
let interval_observable = commands
    .spawn((
        Name::new("IntervalObservable"),
        KeyboardObservable::default()
            .filter(|key_code| {
                matches!(
                    key_code,
                    KeyCode::Digit1 | KeyCode::Digit2 | KeyCode::Digit3 | KeyCode::Digit4
                )
            })
            .switch_map(|key_code| {
                let duration = match key_code {
                    KeyCode::Digit1 => Duration::from_millis(5),
                    KeyCode::Digit2 => Duration::from_millis(100),
                    KeyCode::Digit3 => Duration::from_millis(500),
                    KeyCode::Digit4 => Duration::from_millis(2000),
                    _ => unreachable!(),
                };
                IntervalObservable::new(IntervalObservableOptions {
                    duration,
                    start_on_subscribe: false,
                    max_emissions_per_tick: 4,
                })
            })
            .scan(|acc, _next| acc + 1, 0 as usize)
            .into_component(),
    ))
    .id();

let example_observer = commands
    .spawn(Name::new("ExampleObserver"))
    .observe(print_next_observer::<usize>)
    .id();

let subscription_entity = commands.subscribe::<usize, (), Update>(
    interval_observable,
    example_observer,
);

Concepts & Terminology

Before diving into the individual observables and operators, let’s go through all the concepts and nomenclature you might encounter, and their definitions.

Pretty much everything within the repository assumes that you already know what each of these mean and do to some degree.

Concept Hierarchy

Observer (Destination)

The simplest concept and the one that needs an immediate clarification is the observer as this - in the context of rx_bevy - is not the same thing as Bevy observers!

An RxObserver is something that implements three functions for its 3 observer “channels” via the RxObserver trait:

  • next
  • error
  • complete

Channels & Signals

Functions on the Observer trait can be thought of as channels, carriers of signals, each with a different purpose and behavior:

  • The next channel carries the value signal, the useful content you want to observe.

  • The error channel carries errors separately, to enable error handling independently of the values.

    If you’re curious about why errors are on a separate channel instead of just using Results, see the “Why errors have their own channel?” section.

  • The complete channel carries the completion signal. It signals that no more next or error emissions will come anymore. This signal does not carry any tangible values. And is usually sent right after the last next signal.

Emission

The act of emitting a signal.

Inputs

Observers are things that can receive values, therefore it defines its input types using the ObserverInput trait. These types define the values that are received by the next and error functions.

pub trait ObserverInput {
    type In: Signal;
    type InError: Signal;
}

Notifications

In some places you may encounter signals referred to as notifications. The distinction is that notifications are the materialized form of signals.

This is useful whenever you want to materialize all the different kinds of signals of something into one value, whichever signal that may have been. For example when sending them as an event, or serializing them.

This could be an enum like the ObserverNotification

pub enum ObserverNotification<In, InError>
where
    In: Signal,
    InError: Signal,
{
    Next(In),
    Error(InError),
    Complete,
}

Or as an event used in Bevy like the SubscriberNotificationEvent

Observable

You may already think of Observables as things that emit signals, but that’s not actually (strictly) true!

Observables are things that you can subscribe to with an observer as the destination of the resulting subscription! This resulting subscription is the source of signals!

Therefore, an observable is more like a piece of configuration based on which actual subscriptions can be created.

Some observables emit their values immediately and they only return an already closed “inert” subscription. For them, technically speaking, it was the observer that emitted those signals. For example the of and the iterator observables both complete immediately on subscription.

This may seem like a superficial distinction to make as it still is the observable that you directly interact with, but it is important to understand how they work.

If we know that the state is always part of a subscription and not the observable, it’s clear that you can subscribe to the same observable multiple times, and all subscriptions are going to be unique, independent “instances”, with their own state!

Outputs

Observables define the types of what their subscriptions may emit, what errors (if any) they may produce:

pub trait ObservableOutput {
    type Out: Signal;
    type OutError: Signal;
}

Observables that do not emit errors (or values) use the Never type. Since Never is an empty enum, it is impossible to create a value of it! This ensures that if something says it won’t error, then it really can’t. (The Never type is actually an alias to the Infallible type used with the Result type!)

Example: Subscribing to an Observable

This example demonstrates a subscription of an observable using the PrintObserver as the destination. Each value will be emitted immediately, but one by one, and then complete.

let iterator_observable = IteratorObservable::new(1..=4);
let subscription = iterator_observable
    .subscribe(PrintObserver::new("iterator_observable"));

Output:

iterator_observable - next: 1
iterator_observable - next: 2
iterator_observable - next: 3
iterator_observable - next: 4
iterator_observable - completed
iterator_observable - unsubscribed

Subscription

From the observables perspective, a subscription is an “instance” of an observable. The most important function on it is the unsubscribe function, which will stop the subscription, closing it.

From the subscriptions own perspective, it’s a value that represents owned resources (state, teardown functions) that you can release by calling unsubscribe.

The add and add_teardown methods on subscriptions let you add additional things into the subscription that will also be unsubscribed when the subscription unsubscribes. These can be other subscriptions, or just simple callbacks, aka teardowns.

A very important thing to learn here that everything else, observables, operators, observers, are all there just to create subscriptions. This is where state resides!

Teardown

A teardown function is an FnOnce that can be part of a subscription and will be called on unsubscribe.

Combination Observables

Some observables are there to combine other observables into one. As each input observable can emit as many signals as they want, at their own pace, or maybe even never, there are many ways to combine two observables.

Some examples are:

  • MergeObservable: A tuple of observables of common output types, that emit their values concurrently into a common stream of events.

  • ConcatObservable: A tuple of observables of common output types, that subscribes to one observable at a time, waits until it completes and then subscribes to the next one, in order. (Has the exact same behavior as a MergeObservable with a concurrency_limit of 1!)

  • CombineLatestObservable: Two observables emit into a tuple of each observable output type ((O1::Out, O2::Out)) when any of them emit, but only after each had at least one emission, aka primed.

  • ZipObservable: Two observables emit into a tuple of each observables output type ((O1::Out, O2::Out)) when, for each emission, there is one from the other observable. The first emission of O1 will always be paired with the first emission of O2, the second emissions will be emitted together and so on.

    This can lead to the excessive build up of events when one is emitting fast and the other one is slow. The buffering behavior can be controlled by its options value.

Currently only 2 observables can be combined by each combinator. If you want more, just nest more of them together. (Or help implement varargs.)

Primed

Some observables do not emit anything until they are primed. For every subscription, this can happen at most once, and remains true for the remainder of it’s duration.

For example, some combination observables like combine_latest and zip emit values taken from all of their input observables, so it’s impossible for them to emit anything until this condition is met. Once it is met, the subscription to them can be considered primed, and expected to emit values.

Where priming matters is completion. If an upstream completion prevents priming, downstream should immediately complete too. Once primed, the condition to complete will depend on the observable.

Operators

Operators take an observable as input and return a new observable as output, enhancing the original observable with new behavior.

Composable Operators

Composable Operators are a subset of regular Operators. Unlike - for example - the retry operator, that (as the name suggests) retries subscription to the source, many other operators do not interact with their source observable beyond just subscribing to them once.

All composable operators do is either, or both:

  • Wrap the destination into a subscriber on subscribe

  • Interact with the destination on subscribe

    The start_with and finalize operators don’t create anything new on subscribe, they only interact with the destination subscriber.

But they don’t know anything about who the source observable is.

Why though?

This enables 2 things:

  1. Simpler implementation for a lot of operators!

    By skipping implementing the actual operator that stores the source observable it wraps. This layer is auto implemented using the Pipe operator/observable whose sole job is to combine a source observable and a composable operator.

  2. Enables composite operators (behind the compose feature)!

    Composite operators are (composable) operators made from other composable operators!

    let my_operator = compose_operator::<i32, Never>()
        .map(|i| i * 2)
        .filter(|i| i < &4);
    

    Composite Operators are a convenient way of creating new operator without actually having to implement one from scratch. The obvious limitation here is that it can only use the composable subset of operators. So no retry, no share.

Subscribers

A subscriber is something that’s both an observer and a subscription at the same time!

Most of the time, they wrap another observer or subscriber, which means you can have a deeply nested series of subscribers, in which the deepest element is usually a regular observer, the true destination. And this whole nested structure lives in a subscription!

A single subscriber usually implements a single, easily understandable behavior, that it applies by reacting to upstream emissions, and producing different downstream emissions.

Downstream & Upstream from the Subscribers Perspective

In the context of observables and operators, downstream refers to the destination, where signals are sent, and upstream refers to the source, the caller of the next, error and complete functions.

For example, looking at the map operators next implementation:

fn next(
    &mut self,
    next: Self::In, // This is coming from upstream
) {
    let mapped = (self.mapper)(next);
    self.destination.next(mapped); // And this is sending it downstream
}

Downstream & Upstream from the Operators Perspective

If we zoom out where this operator is used:

let _s = (1..=5)
    .into_observable() // Relative to the `map` operator, this `IteratorObservable` is upstream
    .map(|i| i * 2)
    .skip(1) // And this `skip` operator is downstream
    .subscribe(PrintObserver::new("map_operator")); // The `PrintObserver` is also downstream relative to `map`.

UpgradeableObserver

UpgradeableObserver Source

When subscribing to an Observable, sometimes we want the observable to be able to send an unsubscribe call to this destination, and sometimes it should be detached.

Remember: A subscriber is both an observer and a subscription

Regular Subscribers implement it by returning themselves as there is nothing to upgrade to become a subscriber. Observers do not have a SubscriptionLike impl therefore they need to pick another subscriber to wrap themselves in, when used as a destination in a subscribe call. Which is usually the detached implementation for observers.

Detached Subscriber

A subscriber is detached if it completely avoids sending unsubscribe, or in some cases even complete signals.

Detached subscribers can’t unsubscribe downstream, serving as a hard boundary for unsubscription.

Why do errors have their own channel?

Since each operator and subscriber implements and does only one thing, dealing with erroneous values in every operator would be very tedious. Imagine that when you have an observable that emits Results because it’s fallible, your mappers would need to do an inner map:

fallible_observable
    .map(|i_result| i_result.map(|i| * 2))
    .subscribe(...);

In case you do want to move errors between the error and next channels, you can use the into_result operator to combine all upstream next and error signals into only next signals downstream, changing the downstream error type to Never.

And using the lift_error operator, you can unpack upstream Result values into downstream next and error signals. (In this case, you actually have 2 separate error types, the upstream error signal, and the upstream next results error type. This is why you need to supply an error mapping function into this operator.)

Other Operators

More detailed information on individual operators and their behavior can be seen in their documentation page here in the book, or their package readme (which are the same documents).

The most important information on them are also available on the operators and (primarily) the extension functions themselves too for easy access during development!

Subjects

A subject is something that is both an observable and an observer at the same time!

This makes subjects capable to input data into subscriptions from “outside” of it!

Run this example: cargo run --package rx_core_subject_publish --example subject_example

let mut subject = PublishSubject::<i32>::default();
subject.next(1); // Meteora - Track 11

let mut subscription = subject
      .clone()
      .subscribe(PrintObserver::<i32>::new("subject_example"));

subject.next(2);
subject.next(3);
subscription.unsubscribe();
subject.next(4);

Output:

subject_example - next: 2
subject_example - next: 3
finalize
subject_example - unsubscribed

We can clearly see that only those values were observed that were emitted during when the subscription was active!

Multicasting

As with any observable, a subject can be subscribed to multiple times! This means subjects are fundamentally multicasting!

Whenever you put a value inside it, all of their subscribers will receive it.

Once unsubscribed, no new values can be emitted by the subject. New subscriptions attempted on the subject will be immediately unsubscribed.

Example:

Run this example: cargo run --package rx_core_subject_publish --example subject_multicasting_example

let mut subject = PublishSubject::<i32>::default();

subject.next(1);

let mut subscription_1 = subject
    .clone()
    .finalize(|| println!("finalize subscription 1"))
    .subscribe(PrintObserver::<i32>::new("subject_subscription_1"));

subject.next(2);

let _subscription_2 = subject
    .clone()
    .finalize(|| println!("finalize subscription 2"))
    .subscribe(PrintObserver::<i32>::new("subject_subscription_2"));

subject.next(3);

subscription_1.unsubscribe();

subject.next(4);

Output:

subject_subscription_1 - next: 2
subject_subscription_1 - next: 3
subject_subscription_2 - next: 3
finalize subscription 1
subject_subscription_1 - unsubscribed
subject_subscription_2 - next: 4
finalize subscription 2
subject_subscription_2 - unsubscribed

You can see that the signal 3 was heard by both subscriptions! And each subscription had its own finalize callback! Each individual subscription is unique and can have as many or little operators on it as you want!

PublishSubject

PublishSubject Source

The vanilla subject, it multicasts incoming values to currently active subscribers.

Only completion and error signals are replayed to new subscribers if the subject was finished. If the subject was also unsubscribed, the new subscription too will be immediately unsubscribed.

As all other subjects are just wrappers around PublishSubject, this behavior is shared across all of them.

BehaviorSubject

BehaviorSubject Source

A BehaviorSubject is a subject that always has exactly 1 value of it’s input type stored, therefore to create a new BehaviorSubject, you must provide an initial value.

Immediately when subscribed to, this initial value will be emitted!

This makes the BehaviorSubject ideal to be used as a reactive storage. A value that can change over time where subscribers are always reacting to the latest value without having to wait for it!

BehaviorSubjects continue to replay even after unsubscribed, but they can’t receive new values and the new subscription will be immediately unsubscribed. They do not replay however when they errored!

Not every type of value can have a sensible default, or even if they do, sometimes it doesn’t make sense to use it in the context! In that case, use a ReplaySubject<1, _>!

Example:

Run this example: cargo run --package rx_core_subject_behavior --example behavior_subject_example

let mut subject = BehaviorSubject::<i32>::new(10);

// Immediately prints "hello 10"
let mut hello_subscription = subject
    .clone()
    .subscribe(PrintObserver::<i32>::new("hello"));

subject.next(11);

let _s1 = subject
    .clone()
    .map(|next| next * 2)
    .subscribe(PrintObserver::<i32>::new("hi double"));

subject.next(12);
hello_subscription.unsubscribe();
subject.next(13);
subject.complete();

let mut _compelted_subscription = subject
    .clone()
    .subscribe(PrintObserver::<i32>::new("hello_completed"));

Output:

hello - next: 10
hello - next: 11
hi double - next: 22
hello - next: 12
hi double - next: 24
hello - unsubscribed
hi double - next: 26
hi double - completed
hi double - unsubscribed
hello_completed - next: 13
hello_completed - completed
hello_completed - unsubscribed

ReplaySubject

ReplaySubject Source

A ReplaySubject is a subject that buffers the last N emissions, and when subscribed to immediately replays all of them!

Unlike BehaviorSubject, it does not guarantee that a value is always present, because it does not require you to define some values to create it.

But like BehaviorSubjects, ReplaySubjects continue to replay even after unsubscribed, but they can’t receive new values and the new subscription will be immediately unsubscribed.

You can still next some values immediately into it if you want!

This makes the ReplaySubject ideal to cache something that does not have a sensible default, initial value!

For situation: You’re waiting for a height measurement, which is a number, and numbers have a default value of 0. Some pipelines downstream take this measurement and calculate some things for you. It does not make sense to run that computation with the value 0 as it’s not an actual measurement, just a default. For this situation you can have either a ReplaySubject<1, f32> or a BehaviorSubject<Option<f32>>(None). Sometimes you want stuff to start immediately, even if there is no actual value. Or want this thing to return to an initial, “uninitialized” state.

Example:

Run this example: cargo run --package rx_core_subject_replay --example replay_subject_example

let mut subject = ReplaySubject::<2, i32>::default();

// Doesn't print out anything on subscribe
let _s = subject
    .clone()
    .subscribe(PrintObserver::<i32>::new("hello"));

subject.next(1);
subject.next(2);
subject.next(3);

// Only the last two value is printed out, since our capacity is just 2
let _s2 = subject
    .clone()
    .subscribe(PrintObserver::<i32>::new("hi"));

subject.next(4);
subject.next(5);

Output:

hello - next: 1
hello - next: 2
hello - next: 3
hi - next: 2
hi - next: 3
hello - next: 4
hi - next: 4
hello - next: 5
hi - next: 5
hi - unsubscribed
hello - unsubscribed

When the second subscription subscribed, the buffer contained [2, 3] and was immediately received by the new subscription!

AsyncSubject

AsyncSubject Source

The AsyncSubject will only emit once it completes.

Late subscribers who subscribe after it had already completed will also receive the last result, followed immediately with a completion signal.

What it will emit on completion depends on the reducer function used. By default, it just replaces the result with the most recent observed value next-ed into the subject. But you can also specify your own reducer to accumulate all observed values to be the result on completion.

Example:

Run this example: cargo run --package rx_core_subject_async --example async_subject_example

let mut subject = AsyncSubject::<i32>::default();

let mut _subscription_1 = subject
    .clone()
    .subscribe(PrintObserver::<i32>::new("async_subject sub_1"));

subject.next(1);
subject.next(2);

let mut _subscription_2 = subject
    .clone()
    .subscribe(PrintObserver::<i32>::new("async_subject sub_2"));

subject.next(3);
subject.complete();

let mut _subscription_3 = subject
    .clone()
    .subscribe(PrintObserver::<i32>::new("async_subject sub_3"));

Output:

async_subject sub_1 - next: 3
async_subject sub_2 - next: 3
async_subject sub_1 - completed
async_subject sub_2 - completed
async_subject sub_1 - unsubscribed
async_subject sub_2 - unsubscribed
async_subject sub_3 - next: 3
async_subject sub_3 - completed
async_subject sub_3 - unsubscribed

Scheduling

Every example so far was “immediate”, they either emitted all their values immediately, or - in the case of subjects - when we pushed values into them.

But what really makes observables useful is when they can react to things and emit a signal when they do! And for that, a subscription or a subscriber needs to be able to emit signals even without upstream signals triggering it’s own logic.

This requires something that runs in the “background” to drive the work issued by subscribers or subscriptions.

For example: “Do a next call on this, 2 seconds from now!” or “Call next on this every 200ms from now, until I say stop!”

Scheduler

The scheduler is a shared queue that subscribers have access to (always passed in by the user) to issue work to be executed.

Executor

The executor is the thing responsible to drive the work delegated to the scheduler, collected by the schedulers queue. It owns the scheduler, for which handles can be acquired from the executor.

Work

There are multiple types of work, depending on how they are handled with respect to time. So that reimplementing basic time based logic - like a delay - is not required by the issuer of the work.

Work can be issued, cancelled, or invoked from the scheduler queue.

Immediate Work

The simplest type of work, it executes as soon as the executor receives it.

Delayed Work

Delayed work will be executed only after its specified delay had passed.

Repeated Work

Repeated work re-executes its work each time it repeats, after a specified time interval.

Continuous Work

Continouos Work is like repeated work but without the time interval, they simply execute as many times as often as they can.

It depends on the executor to define the actual frequency this type of work is running at.

  • With the tickable executor, this means on every tick call.
  • In Bevy, this means once every frame.
  • In an async executor, this would be set to a reasonably fast interval, like a target FPS.

Invoked Work

Invoked work is not executed automatically, but based on its invoke_id can be “invoked” which means executing it as soon as the executor can.

For a ticking executor this means the next tick after invocation

Scheduler Context

Executors define a context, passed in as a mutable reference to the whenever they are executed. The main job of the context is to provide the current time (as a Duration, denoting the time passed since startup).

For example: Interacting with the Bevy ECS world.

The context is defined by the executor, but subscribers can be written for specific contexts too. Resulting in context specific subscribers with extra capabilities relevant only in that context, compatible only with that executor.

Scheduler Work Input

Most generic scheduled subscribers do not need to know about anything besides the time coming from the context. Still, some executors can provide extra data relevant to the execution of the work at that moment.

For example: In the TickingExecutor, the Tick object is passed into every executed work.

TickingExecutor

TickingExecutor Source

The base scheduler used both for tests, and for Bevy. It can be manually advanced by calling tick.

Time can only move forwards in the executor!

Interval Example

let mut mock_executor = MockExecutor::new_with_logging();
let scheduler = mock_executor.get_scheduler_handle();

let mut interval_observable = IntervalObservable::new(
  IntervalObservableOptions {
    duration: Duration::from_secs(1),
    max_emissions_per_tick: 3,
    start_on_subscribe: true,
  },
  scheduler,
);
let _subscription = interval_observable.subscribe(PrintObserver::new("interval_observable"));

mock_executor.tick(Duration::from_millis(600));
mock_executor.tick(Duration::from_millis(401));
mock_executor.tick(Duration::from_millis(16200)); // lag spike! would result in 16 emissions, but the limit is 2!
mock_executor.tick(Duration::from_millis(1200));
mock_executor.tick(Duration::from_millis(2200));

Output:

interval_observable - next: 0
Ticking... (600ms)
Ticking... (401ms)
interval_observable - next: 1
Ticking... (16.2s)
interval_observable - next: 2
interval_observable - next: 3
interval_observable - next: 4
Ticking... (1.2s)
interval_observable - next: 5
Ticking... (2.2s)
interval_observable - next: 6
interval_observable - next: 7
interval_observable - unsubscribed

Usage Within Bevy

Add the RxPlugin and one or more RxSchedulerPlugins for the schedules you want to run your scheduled observables in.

If you do not use scheduled observables, you can skip adding the RxSchedulerPlugins. You know if you use one because they all need a SchedulerHandle. Some Commands extensions also require a SchedulerHandle to create any kind of subscriptions!

fn main() -> AppExit {
    App::new()
        .add_plugins((
            DefaultPlugins,
            RxPlugin,
            RxSchedulerPlugin::<Update, Virtual>::default(),
        ))
        .run()
}

To access a scheduler, use the RxSchedule<BevySchedule, Clock> SystemParam:

fn setup_subscription(
    mut commands: Commands,
    rx_schedule_update_virtual: RxSchedule<Update, Virtual>,
) {
    // Use `rx_schedule_update_virtual` to get a `SchedulerHandle` for
    // creating scheduled observables within this system.
}

Now you can create subscriptions that are fully integrated with Bevy’s ECS, live as entites and react to Bevy events, component removals:

Use the RxSignal<Out, OutError> to observe signals from EntityDestination subscriptions!

fn setup_subscription(
    mut commands: Commands,
    rx_schedule_update_virtual: RxSchedule<Update, Virtual>,
) {
    let destination_entity = commands
        .spawn_empty()
        .observe(|signal: Trigger<RxSignal<usize>>| println!("{:?}", signal.signal()))
        .id();

    let observable_entity = commands
        .spawn(
            IntervalObservable::new(
                IntervalObservableOptions {
                    duration: Duration::from_secs(1),
                    start_on_subscribe: true,
                    max_emissions_per_tick: 1,
                },
                rx_schedule_update_virtual.handle(),
            )
            // .map(|i| i.to_string()) // This would change the output type of the observable, making the subscribe command below fail!
            .into_component(),
        )
        .id();

    // This is now **not** an `EntitySubscription`, as the subscription
    // will be made once the command executes! It's just an `Entity`!
    // Put it somewhere so you can despawn it!
    let _subscription_entity = commands.subscribe(
        observable_entity,
        EntityDestination::<usize, Never>::new(
            destination_entity,
            rx_schedule_update_virtual.handle(),
        ),
    );
}

Or you can create subscriptions that only partially integrate with Bevy’s ECS:

It’s perfectly fine to not use observables as components! You can create subscriptions directly from observables created in systems! Just be careful not dropping the subscription as they unsubscribe on drop!

fn setup_subscription(
    mut commands: Commands,
    rx_schedule_update_virtual: RxSchedule<Update, Virtual>,
    mut my_subscriptions: ResMut<MySubscriptions>,
) {
    let destination_entity = commands
        .spawn_empty()
        .observe(|signal: Trigger<RxSignal<usize>>| println!("{:?}", signal.signal()))
        .id();

    let subscription = IntervalObservable::new(
        IntervalObservableOptions {
            duration: Duration::from_secs(1),
            start_on_subscribe: true,
            max_emissions_per_tick: 1,
        },
        rx_schedule_update_virtual.handle(),
    )
    .subscribe(EntityDestination::new(
        destination_entity,
        rx_schedule_update_virtual.handle(),
    ));

    my_subscriptions.add(subscription);
}

Use operators to crate more complex observables, and orchestrate your events!

KeyboardObservable::new(default(), rx_schedule_update_virtual.handle())
    .filter(|key_code, _| {
        matches!(
            key_code,
            KeyCode::Digit1 | KeyCode::Digit2 | KeyCode::Digit3 | KeyCode::Digit4
        )
    })
    .start_with(KeyCode::Digit3)
    .switch_map(
        move |key_code| {
            let duration = match key_code {
                KeyCode::Digit1 => Duration::from_millis(5),
                KeyCode::Digit2 => Duration::from_millis(100),
                KeyCode::Digit3 => Duration::from_millis(500),
                KeyCode::Digit4 => Duration::from_millis(2000),
                _ => unreachable!(),
            };
            println!("Switching to a new inner observable with duration: {duration:?}");
            IntervalObservable::new(
                IntervalObservableOptions {
                    duration,
                    start_on_subscribe: false,
                    max_emissions_per_tick: 4,
                },
                schedule_update_virtual.clone(),
            )
        },
        Never::map_into(),
    )
    .scan(|acc, _next| acc + 1, 0_usize)

Use subjects or the share operator to multicast observables to multiple subscribers, save computation by sharing a single subscription!

Observables (Core)

observable_closed

crates.io ci codecov license

An observable that immediately closes without completing or emitting any values.

See also

Example

cargo run -p rx_core --example observable_closed_example
let _subscription = closed().subscribe(PrintObserver::new("closed"));
println!("end");

Output:

closed - unsubscribed
end

observable_combine_changes

crates.io ci codecov license

The CombineChangesObservable subscribes to two input observables, and emits when either of them emit, even if the other haven’t emitted yet.

It wraps downstream signals into the Change<T> enum, where a value is either:

  • JustUpdated - When this value’s changed caused the emission.
  • Latest - When this value did not change since the last emission.
  • None - When this observable have not emitted yet.

See Also

  • CombineLatestObservable - Subscribes to two different observables, and emit the latest of both both values when either of them emits. It only starts emitting once both have emitted at least once.
  • ZipObservable - Subscribes to two different observables, and emit both values when both of them emits, pairing up emissions by the order they happened.
  • JoinObservable - Subscribes to two different observables, and emit the latest of both values once both of them had completed!

Example

cargo run -p rx_core --example observable_combine_changes_example
let mut greetings_subject = PublishSubject::<&'static str>::default();
let mut count_subject = PublishSubject::<usize>::default();

let mut subscription = combine_changes(greetings_subject.clone(), count_subject.clone())
    .subscribe(PrintObserver::new("combine_changes"));

greetings_subject.next("Hello!");
count_subject.next(10);
count_subject.next(20);
greetings_subject.next("Szia!");
greetings_subject.complete();
count_subject.next(30);
count_subject.complete();
subscription.unsubscribe();

Output:

combine_changes - next: (JustUpdated("Hello!"), None)
combine_changes - next: (Latest("Hello!"), JustUpdated(10))
combine_changes - next: (Latest("Hello!"), JustUpdated(20))
combine_changes - next: (JustUpdated("Szia!"), Latest(20))
combine_changes - next: (Latest("Szia!"), JustUpdated(30))
combine_changes - completed
combine_changes - unsubscribed

observable_combine_latest

crates.io ci codecov license

The CombineLatestObservable subscribes to two input observables, and emits the latest of both values when either of them emits. It only starts emitting once both have emitted at least once.

See Also

  • CombineChangesObservable - Subscribes to two different observables, and emit the latest of both both values when either of them emits. It denotes which one had changed, and it emits even when one on them haven’t emitted yet.
  • ZipObservable - Subscribes to two different observables, and emit both values when both of them emits, pairing up emissions by the order they happened.
  • JoinObservable - Subscribes to two different observables, and emit the latest of both values once both of them had completed!

Example

cargo run -p rx_core --example observable_combine_latest_example
let mut greetings_subject = PublishSubject::<&'static str>::default();
let mut count_subject = PublishSubject::<usize>::default();

let mut subscription = combine_latest(
  greetings_subject
    .clone()
    .tap(PrintObserver::new("greetings_subject")),
  count_subject
    .clone()
    .tap(PrintObserver::new("count_subject")),
)
.subscribe(PrintObserver::new("combine_latest"));

greetings_subject.next("Hello!");
count_subject.next(10);
count_subject.next(20);
greetings_subject.next("Szia!");
greetings_subject.complete();
count_subject.next(30);
count_subject.complete();
subscription.unsubscribe();

Output:

greetings_subject - next: "Hello!"
count_subject - next: 10
combine_latest - next: ("Hello!", 10)
count_subject - next: 20
combine_latest - next: ("Hello!", 20)
greetings_subject - next: "Szia!"
combine_latest - next: ("Szia!", 20)
greetings_subject - completed
greetings_subject - unsubscribed
count_subject - next: 30
combine_latest - next: ("Szia!", 30)
count_subject - completed
count_subject - unsubscribed
combine_latest - completed
combine_latest - unsubscribed

observable_concat

crates.io ci codecov license

Combine many observables of the same output type into one by subscribing to them sequentially in order.

See Also

  • MergeObservable - Combine many observables of the same output type by subscribing to all of them at once.

Example

cargo run -p rx_core --example observable_concat_example
let mut subject_1 = PublishSubject::<usize>::default();
let mut subject_2 = PublishSubject::<usize>::default();
let mut subject_3 = PublishSubject::<usize>::default();

let _subscription = concat((
  subject_1.clone(),
  subject_2.clone().take(2),
  subject_3.clone(),
))
.subscribe(PrintObserver::new("concat_operator"));

subject_1.next(1);
subject_1.complete();
subject_3.complete();
subject_2.next(2);
subject_2.next(3);

Output:

concat_operator - next: 1
concat_operator - next: 2
concat_operator - next: 3
concat_operator - completed
concat_operator - unsubscribed

observable_connectable

crates.io ci codecov license

Maintains an internal connector subject and only subscribes the source when connect is called, letting multiple subscribers share that connection.

See Also

  • ShareOperator - Multicast a source through a connector so downstream subscribers share one upstream subscription. The connector can be any subject.

Example

cargo run -p rx_core --example observable_connectable_example
let mut source = PublishSubject::<usize>::default();
let mut connectable = ConnectableObservable::new(
  source.clone().finalize(|| println!("disconnected...")),
  ConnectableOptions {
    connector_provider: ProvideWithDefault::<ReplaySubject<1, _>>::default(),
    disconnect_when_ref_count_zero: true,
    reset_connector_on_disconnect: false,
    reset_connector_on_complete: false,
    reset_connector_on_error: false,
  },
);
let mut _subscription_0 = connectable.subscribe(PrintObserver::new("connectable_observable 0"));
source.next(0);

println!("connect!");
let _connection = connectable.connect();
source.next(1);
connectable.disconnect();

let mut _subscription_1 = connectable.subscribe(PrintObserver::new("connectable_observable 1"));
println!("connect again!");
connectable.connect();
source.next(2);

println!("end");

Output:

connect!
connectable_observable 0 - next: 1
disconnected...
connectable_observable 1 - next: 1
connect again!
connectable_observable 1 - next: 2
connectable_observable 0 - next: 2
end
connectable_observable 1 - unsubscribed
disconnected...
connectable_observable 0 - unsubscribed

observable_create

crates.io ci codecov license

The create_observable provides a simple way to create custom observables by defining a producer function that can emit values to subscribers.

The producer function is cloned for each subscribe call to avoid shared state between individual subscriptions.

See Also

Example

cargo run -p rx_core --example observable_create_example
let _s = create_observable::<&str, Never, _>(|destination| {
    destination.next("hello");
    destination.complete();
})
.subscribe(PrintObserver::new("create_observable"));
create_observable - next: "hello"
create_observable - completed
create_observable - unsubscribed

observable_deferred

crates.io ci codecov license

Subscribes to an observable returned by a function.

See Also

Example

cargo run -p rx_core --example observable_deferred_example
let i = RefCell::new(1);
let mut deferred = deferred_observable(|| {
  println!("subscribe!");
  (0..=*i.borrow()).into_observable()
});

*i.borrow_mut() = 2;
let _subscription = deferred.subscribe(PrintObserver::new("deferred_observable"));

Output:

subscribe!
deferred_observable - next: 0
deferred_observable - next: 1
deferred_observable - next: 2
deferred_observable - completed
deferred_observable - unsubscribed

observable_empty

crates.io ci codecov license

Immediately completes.

See Also

Example

cargo run -p rx_core --example observable_empty_example
let _subscription = empty().subscribe(PrintObserver::new("empty"));

Output:

empty - completed
empty - unsubscribed

observable_interval

crates.io ci codecov license

Emits a sequence of usize values every time the configured duration elapses.

See Also

Example

Run the example with:

cargo run -p rx_core --example observable_interval_example
let mut mock_executor = MockExecutor::new_with_logging();
let scheduler = mock_executor.get_scheduler_handle();

let mut interval_observable = IntervalObservable::new(
    IntervalObservableOptions {
        duration: Duration::from_secs(1),
        max_emissions_per_tick: 3,
        start_on_subscribe: true,
    },
    scheduler,
);
let _subscription = interval_observable.subscribe(PrintObserver::new("interval_observable"));

mock_executor.tick(Duration::from_millis(600));
mock_executor.tick(Duration::from_millis(401));
mock_executor.tick(Duration::from_millis(16200));
mock_executor.tick(Duration::from_millis(1200));
mock_executor.tick(Duration::from_millis(2200));

Output:

interval_observable - next: 0
Ticking... (600ms)
Ticking... (401ms)
interval_observable - next: 1
Ticking... (16.2s)
interval_observable - next: 2
interval_observable - next: 3
interval_observable - next: 4
Ticking... (1.2s)
interval_observable - next: 5
Ticking... (2.2s)
interval_observable - next: 6
interval_observable - next: 7
interval_observable - unsubscribed

observable_iterator

crates.io ci codecov license

This crate provides functionality to convert iterators into observables using the IntoIteratorObservableExt extension trait.

See Also

Features

  • Extension Trait: IntoIteratorObservableExt provides the into_observable() method for any type that implements IntoIterator + Clone
  • Universal Support: Works with ranges, vectors, arrays, and any other iterator type
  • No Conflicts: Uses an extension trait approach to avoid conflicts with the main IntoObservable trait

Usage

#![allow(unused)]
fn main() {
use rx_core::prelude::*;

// Convert ranges into observables
(1..=5).into_observable::<()>().subscribe(PrintObserver::new("range"));

// Convert vectors into observables
vec![1, 2, 3].into_observable::<()>().subscribe(PrintObserver::new("vector"));

// Convert arrays into observables
[10, 20, 30].into_observable::<()>().subscribe(PrintObserver::new("array"));
}

Examples

cargo run -p rx_core --example observable_iterator_into_example
cargo run -p rx_core --example observable_iterator_example

observable_iterator_on_tick

crates.io ci codecov license

Emits iterator items one per scheduler tick.

See Also

Example

Run the example with:

cargo run -p rx_core --example observable_iterator_on_tick_example
let mut executor = MockExecutor::default();
let scheduler = executor.get_scheduler_handle();

let iterator_observable = IteratorOnTickObservable::new(
    0..=7,
    OnTickObservableOptions {
        start_on_subscribe: true,
        emit_at_every_nth_tick: 2,
    },
    scheduler,
);
let _subscription = iterator_observable
    .finalize(|| println!("fin"))
    .subscribe(PrintObserver::new("iterator_on_tick"));
println!("subscribed!");

executor.tick(Duration::from_millis(500));
executor.tick(Duration::from_millis(16));
executor.tick(Duration::from_millis(9001));
executor.tick(Duration::from_millis(0));
executor.tick(Duration::from_millis(10));

Output:

iterator_on_tick - next: 0
subscribed!
iterator_on_tick - next: 1
iterator_on_tick - next: 2
fin
iterator_on_tick - unsubscribed

observable_join

crates.io ci codecov license

Emits the latest values from both inputs once both complete.

This observable will only emit once both of its input observables have completed. After which it will emit a tuple of the last emissions from each input observable, then complete.

Meaning if even one of the observables haven’t emitted before all of them had completed, only a complete notification will be observed!

If not all observables complete, nothing will be emitted even if all input observables were primed.

See Also

  • CombineChangesObservable - Emits the latest of two sources, tagging which side changed, even before both have emitted.
  • CombineLatestObservable - Emits the latest of two sources whenever either emits, after both emitted at least once.
  • ZipObservable - Emits paired tuples when both sources emit, matched by emission order.

Example

cargo run -p rx_core --example observable_join_example
let observable_1 = (1..=3).into_observable();
let observable_2 = (4..=6).into_observable();

let _subscription = join(observable_1, observable_2).subscribe(PrintObserver::new("join"));

Output:

join - next: (3, 6)
join - completed
join - unsubscribed

observable_just

crates.io ci codecov license

Immediately emits a single value.

See Also

Example

cargo run -p rx_core --example observable_just_example
let _s = just("hello").subscribe(PrintObserver::new("just"));

Output:

just - next: "hello"
just - completed
just - unsubscribed

observable_merge

crates.io ci codecov license

Combine many observables of the same output type into one by subscribing to all of them at once.

See Also

  • ConcatObservable - Combine many observables of the same output type by subscribing to them sequentially in order.

Example

cargo run -p rx_core --example observable_merge_example
let observable_1 = (1..=3).into_observable().skip(2);
let observable_2 = (4..=6).into_observable().take(1);
let observable_3 = (95..=98).into_observable();

let _subscription = merge((observable_1, observable_2, observable_3), usize::MAX)
    .subscribe(PrintObserver::<i32>::new("merge_observable"));

Output:

merge_observable - next: 3
merge_observable - next: 4
merge_observable - next: 95
merge_observable - next: 96
merge_observable - next: 97
merge_observable - next: 98
merge_observable - completed
merge_observable - unsubscribed

observable_never

crates.io ci codecov license

Never emits and never unsubscribes, only once it’s dropped!

Warning: You will be responsible to unsubscribe from subscriptions made to this observable, as it will never do so on its own!

See Also

Example

cargo run -p rx_core --example observable_never_example
let _subscription = never().subscribe(PrintObserver::new("never"));
println!("nothing happens before dropping the subscription!");

Output:

nothing happens before dropping the subscription!
never - unsubscribed

observable_throw

crates.io ci codecov license

Immediately errors.

See Also

Example

cargo run -p rx_core --example observable_throw_example
let _subscription = throw("hello").subscribe(PrintObserver::new("throw_example"));

Output:

throw_example - error: "hello"

observable_timer

crates.io ci codecov license

Emits once after the timer elapses.

See Also

Example

Run the example with:

cargo run -p rx_core --example observable_timer_example
let mut mock_executor = MockExecutor::new_with_logging();
let scheduler = mock_executor.get_scheduler_handle();

let mut timer = TimerObservable::new(Duration::from_secs(1), scheduler);
let _subscription = timer.subscribe(PrintObserver::new("timer_observable"));

mock_executor.tick(Duration::from_millis(600));
mock_executor.tick(Duration::from_millis(400));

Output:

Ticking... (600ms)
Ticking... (400ms)
timer_observable - next: ()
timer_observable - completed
timer_observable - unsubscribed

observable_zip

crates.io ci codecov license

Subscribes to two observables, emitting paired tuples when both have emitted, matching them in emission order.

See Also

  • CombineChangesObservable - Emits the latest of two sources, tagging which side changed, even before both have emitted.
  • CombineLatestObservable - Emits the latest of two sources whenever either emits, after both emitted at least once.
  • JoinObservable - Emits the last values from both sources once both have completed.

Example

cargo run -p rx_core --example observable_zip_example
let observable_1 = (1..=3).into_observable();
let observable_2 = (4..=6).into_observable();

let _subscription = zip(observable_1, observable_2)
    .subscribe(PrintObserver::new("zip_observable"));

Output:

zip_observable - next: (1, 4)
zip_observable - next: (2, 5)
zip_observable - next: (3, 6)
zip_observable - completed
zip_observable - unsubscribed

Observables (Bevy Specific)

observable_event

crates.io ci codecov license

The EventObservable turns Bevy events triggered on an entity into signals, allowing you to use any event as an observable source, and construct reactive pipelines from them using operators.

Subscribers will observe events targeted at the specified entity, and a completion signal once the entity is despawned.

See Also

Example

cargo run -p rx_bevy --example observable_event_example
#[derive(Resource, Deref, DerefMut)]
pub struct DummyEventTarget(Entity);

#[derive(Resource, Default, Deref, DerefMut)]
pub struct ExampleSubscriptions(SharedSubscription);

fn setup(
    mut commands: Commands,
    rx_schedule_update_virtual: RxSchedule<Update, Virtual>,
    mut subscriptions: ResMut<ExampleSubscriptions>,
) {
    commands.spawn((
        Camera3d::default(),
        Transform::from_xyz(2., 6., 8.).looking_at(Vec3::ZERO, Vec3::Y),
    ));

    let watched_entity = commands.spawn(Name::new("Watch me")).id();

    subscriptions.add(
        EventObservable::<DummyEvent>::new(watched_entity, rx_schedule_update_virtual.handle())
            .subscribe(PrintObserver::new("event_observable")),
    );

    commands.insert_resource(DummyEventTarget(watched_entity));
}

Then, provided something is triggering DummyEvents on the watched entity:

Producer is sending DummyEvent { target: 6v1#4294967302 } to 6v1!
event_observable - next: DummyEvent { target: 6v1#4294967302 }
Producer is sending DummyEvent { target: 6v1#4294967302 } to 6v1!
event_observable - next: DummyEvent { target: 6v1#4294967302 }
...

observable_keyboard

crates.io ci codecov license

The KeyboardObservable turns Bevy keyboard input events into signals. The events are sourced from the ButtonInput<KeyCode> resource.

Options

KeyCode signals can be observed in multiple modes:

  • KeyboardObservableEmit::JustPressed - emits once when the key is pressed down.
  • KeyboardObservableEmit::JustReleased - emits once when the key is released.
  • KeyboardObservableEmit::WhilePressed - emits continuously while the key is held down.

See Also

Example

cargo run -p rx_bevy --example observable_keyboard_example
fn main() -> AppExit {
    App::new()
        .add_plugins((
            DefaultPlugins,
            RxPlugin,
            RxSchedulerPlugin::<Update, Virtual>::default(),
        ))
        .add_systems(Startup, setup)
        .add_systems(
            Update,
            (
                send_message(AppExit::Success).run_if(input_just_pressed(KeyCode::Escape)),
                unsubscribe.run_if(input_just_pressed(KeyCode::Space)),
            ),
        )
        .run()
}

fn unsubscribe(mut example_entities: ResMut<MySubscriptions>) {
    example_entities.subscription.unsubscribe();
}

#[derive(Resource)]
struct MySubscriptions {
    subscription: SharedSubscription,
}

fn setup(mut commands: Commands, rx_schedule_update_virtual: RxSchedule<Update, Virtual>) {
    let subscription = KeyboardObservable::new(default(), rx_schedule_update_virtual.handle())
        .subscribe(PrintObserver::new("keyboard"));

    commands.insert_resource(MySubscriptions {
        subscription: SharedSubscription::new(subscription),
    });
}

Output when pressing WASD keys and Space:

keyboard - next: KeyW
keyboard - next: KeyA
keyboard - next: KeyS
keyboard - next: KeyD
keyboard - next: Space
keyboard - unsubscribed

observable_message

crates.io ci codecov license

The MessageObservable lets you observe global messages written by a MessageWriter!

  • Messages written in or before the observables schedule will be observed in the same frame. If the message was written in a later schedule, it will be observed in the next frame, which could lead to 1 frame flickers if something also reacts to the same message in the same frame it was written!

See Also

Example

cargo run -p rx_bevy --example observable_message_example

observable_proxy

crates.io ci codecov license

The ProxyObservable can subscribe to another observable entity of matching type!

See Also

Example

cargo run -p rx_bevy --example observable_proxy_example
let keyboard_observable_entity = commands
    .spawn((
        Name::new("KeyboardObservable"),
        KeyboardObservable::new(default(), rx_schedule_update_virtual.handle())
            .into_component(),
    ))
    .id();

let _s = ProxyObservable::<KeyCode, Never>::new(
    keyboard_observable_entity,
    rx_schedule_update_virtual.handle(),
).subscribe(PrintObserver::new("proxy_observable"));

observable_resource

crates.io ci codecov license

The ResourceObservable call a “reader” function on an observable every time it is added or mutated, emitting the result to subscribers.

See Also

Options

  • trigger_on_is_added: Emit also when the resource was just added. (Default: true)
  • trigger_on_is_changed: Emit on each tick where the resource was accessed mutably, except when the resource was just added. (Default: true)

Example

cargo run -p rx_bevy --example observable_resource_example
ResourceObservable::<DummyResource, _, usize>::new(
    |res| res.count,
    ResourceObservableOptions {
        trigger_on_is_added: true,
        trigger_on_is_changed: true,
    },
    rx_schedule_update_virtual.handle(),
)
.subscribe(PrintObserver::new("resource_observable"));

Observers

observer_fn

crates.io ci codecov license

Define observers from callbacks: FnObserver (static dispatch) and DynFnObserver (dynamic dispatch).

See Also

Example

cargo run -p rx_core --example observer_fn_example
let _subscription = just("world").subscribe(FnObserver::new(
  |next| println!("hello: {next}"),
  |_error| println!("error"),
  || {},
));

Output:

hello: world

observer_noop

crates.io ci codecov license

Ignore all signals; panics on errors in debug mode.

See Also

Example

cargo run -p rx_core --example observer_noop_example
let _subscription = just(1).subscribe(NoopObserver::default());

Output:

*crickets*

observer_print

crates.io ci codecov license

Print all observed signals to stdout for quick debugging.

See Also

Example

cargo run -p rx_core --example observer_print_example
let _subscription = just(1).subscribe(PrintObserver::new("hello"));

Output:

hello - next: 1
hello - completed
hello - unsubscribed

Observers (Bevy Specific)

EntityDestination

Send observed signals to an entity as RxSignal events.

EntityDestination is an RxObserver that wraps a Bevy Entity. When signals are observed, they are forwarded to the destination entity as events, allowing you to react to them using a Bevy observer system.

See Also

Usage

fn setup(rx_schedule_update_virtual: RxSchedule<Update, Virtual>, mut commands: Commands) {
    let destination_entity = commands
        .spawn_empty()
        .observe(|signal: On<RxSignal<i32>>| println!("Received value: {:?}", signal.event()))
        .id();
    
    // or `just(1)` if you have `observable_fn` feature enabled
    let _s = JustObservable::new(1).subscribe(EntityDestination::new(
        destination_entity,
        rx_schedule_update_virtual.handle(),
    ));
}

ResourceDestination

Write into a Bevy resource when observing signals.

ResourceDestination is an RxObserver that allows you to write observed signals directly into a Bevy Resource.

See Also

Usage


#[derive(Resource, Default, Debug)]
struct Counter(i32);

fn setup(rx_schedule_update_virtual: RxSchedule<Update, Virtual>) {
    let _s = JustObservable::new(1).subscribe(ResourceDestination::new(
        |mut counter: Mut<'_, Counter>, signal| {
            println!("Received signal: {:?}", signal);
            if let ObserverNotification::Next(value) = signal {
                counter.0 = value;
            }
            println!("Counter updated to: {:?}", counter);
        },
        rx_schedule_update_virtual.handle(),
    ));
}

Operators (Core)

operator_adsr

crates.io ci codecov license

Convert trigger signals into an ADSR envelope driven by the scheduler.

See Also

Example

cargo run -p rx_core --example operator_adsr_example
use std::time::Duration;

use rx_core::prelude::*;
use rx_core_testing::MockExecutor;

fn main() {
  let mut executor = MockExecutor::default();
  let scheduler = executor.get_scheduler_handle();

  let envelope = AdsrEnvelope {
    attack_time: Duration::from_millis(10),
    decay_time: Duration::from_millis(10),
    sustain_volume: 0.5,
    release_time: Duration::from_millis(15),
    ..Default::default()
  };

  let mut source = PublishSubject::<AdsrTrigger>::default();

  let mut subscription = source
    .clone()
    .adsr(
      AdsrOperatorOptions {
        envelope,
        ..Default::default()
      },
      scheduler.clone(),
    )
    .subscribe(PrintObserver::new("adsr"));

  source.next(true.into());
  executor.tick(Duration::from_millis(10));
  executor.tick(Duration::from_millis(10));

  source.next(false.into());
  executor.tick(Duration::from_millis(15));

  subscription.unsubscribe();
}
adsr - next: AdsrSignal { adsr_envelope_phase: Attack, phase_transition: AdsrEnvelopePhaseTransition(1), t: 0ns, value: 0.0 }
adsr - next: AdsrSignal { adsr_envelope_phase: Decay, phase_transition: AdsrEnvelopePhaseTransition(2), t: 10ms, value: 1.0 }
adsr - next: AdsrSignal { adsr_envelope_phase: None, phase_transition: AdsrEnvelopePhaseTransition(16), t: 0ns, value: 0.0 }
adsr - unsubscribed

operator_buffer_count

crates.io ci codecov license

Collect values into fixed-size buffers before emitting them.

Example

cargo run -p rx_core --example operator_buffer_count_example
#![allow(unused)]
fn main() {
let _s = (1..=25)
    .into_observable()
    .buffer_count(3)
    .subscribe(PrintObserver::new("buffer_count_operator"));
}
buffer_count_operator - next: [1, 2, 3]
buffer_count_operator - next: [4, 5, 6]
buffer_count_operator - next: [7, 8, 9]
buffer_count_operator - next: [10, 11, 12]
buffer_count_operator - next: [13, 14, 15]
buffer_count_operator - next: [16, 17, 18]
buffer_count_operator - next: [19, 20, 21]
buffer_count_operator - next: [22, 23, 24]
buffer_count_operator - next: [25]
buffer_count_operator - completed
buffer_count_operator - unsubscribed

operator_catch

crates.io ci codecov license

On error, switch to a recovery observable.

See Also

Example

cargo run -p rx_core --example operator_catch_example
#![allow(unused)]
fn main() {
let _s = concat((
    (1..=3).into_observable().map_never(),
    throw("error").map_never(),
))
.map(|i| i * 10)
.catch(|_error| IteratorObservable::new(90..=92))
.subscribe(PrintObserver::new("catch"));
}

Output:

catch - next: 10
catch - next: 20
catch - next: 30
catch - next: 90
catch - next: 91
catch - next: 92
catch - completed
catch - unsubscribed

operator_composite

crates.io ci codecov license

Build reusable operator chains without needing a source observable.

See Also

  • IdentityOperator - A no-op operator, used mainly as the entry point of a CompositeOperator.

Example

cargo run -p rx_core --example operator_composite_operators_example
use rx_core::prelude::*;

/// Composite operators offer an easy way to create complex operators, but they
/// do increase type complexity, good for prototyping and smaller things, but
/// you should prefer implementing an actual operator
fn main() {
  // Though not necessary, the IdentityOperator provides an easy way to define
  // input types for our composite operator.
  let op = IdentityOperator::<i32, Never>::default()
    .map(|next: i32| next + 1)
    .map(|next: i32| next * 100);

  let _s = just(1).pipe(op).subscribe(PrintObserver::new("hello"));

  // Or though the type extensions you can chain built in operators just like on observables
  let op_2 = IdentityOperator::<i32, Never>::default()
    .map(|i| i * 2)
    .filter(|i, _| i % 2 == 0);

  let _s2 = just(1).pipe(op_2).subscribe(PrintObserver::new("bello"));
}
hello - next: 200
hello - completed
hello - unsubscribed
bello - next: 2
bello - completed
bello - unsubscribed

operator_concat_all

crates.io ci codecov license

Example

cargo run -p rx_core --example operator_concat_all_example
let mut mock_executor = MockExecutor::new_with_logging();
let scheduler = mock_executor.get_scheduler_handle();
let mut enqueue_timer_of_length = PublishSubject::<usize>::default();

let mut _subscription = enqueue_timer_of_length
    .clone()
    .finalize(|| println!("finalize: upstream"))
    .tap_next(|n| println!("emit (source): {n:?}"))
    .map(move |next| {
        interval(
            IntervalObservableOptions {
                duration: Duration::from_secs(1),
                start_on_subscribe: false,
                max_emissions_per_tick: 10,
            },
            scheduler.clone(),
        )
        .finalize(move || println!("timer of {next} finished!"))
        .take(next)
        .map(move |i| format!("{i} (Timer of {next})"))
    })
    .concat_all(Never::map_into())
    .finalize(|| println!("finalize: downstream"))
    .subscribe(PrintObserver::new("concat_all"));

enqueue_timer_of_length.next(4);
enqueue_timer_of_length.next(1);
enqueue_timer_of_length.next(3);
enqueue_timer_of_length.complete();
mock_executor.tick(Duration::from_secs(4));
mock_executor.tick(Duration::from_secs(1));
mock_executor.tick(Duration::from_secs(3));

Output:

emit (source): 4
emit (source): 1
emit (source): 3
Ticking... (4s)
concat_all - next: "0 (Timer of 4)"
concat_all - next: "1 (Timer of 4)"
concat_all - next: "2 (Timer of 4)"
concat_all - next: "3 (Timer of 4)"
timer of 4 finished!
Ticking... (1s)
concat_all - next: "0 (Timer of 1)"
timer of 1 finished!
Ticking... (3s)
concat_all - next: "0 (Timer of 3)"
concat_all - next: "1 (Timer of 3)"
concat_all - next: "2 (Timer of 3)"
concat_all - completed
finalize: downstream
finalize: upstream
concat_all - unsubscribed
timer of 3 finished!
end

operator_concat_map

crates.io ci codecov license

Map each value to an inner observable and subscribe to them one at a time in order.

See Also

  • ConcatAllOperator - Subscribes to upstream observables one at a time in order.
  • MergeAllOperator - Subscribes to upstream observables and merges their emissions concurrently.
  • SwitchAllOperator - Switch to the newest inner observable, unsubscribing previous ones.
  • ExhaustAllOperator - Ignore new inner observables while one is active.
  • MergeMapOperator - Map each value to an inner observable and merge their emissions concurrently.
  • SwitchMapOperator - Map each value to an inner observable and switch to the latest, unsubscribing previous ones.
  • ExhaustMapOperator - Map each value to an inner observable and ignore new ones while one is active.

Example

cargo run -p rx_core --example operator_concat_map_example
#[derive(Clone, Debug)]
enum Either {
  Left,
  Right,
}

let mut upstream_subject = PublishSubject::<Either>::default();
let mut inner_left_subject = PublishSubject::<i32>::default();
let mut inner_right_subject = PublishSubject::<i32>::default();

let l = inner_left_subject.clone();
let r = inner_right_subject.clone();
let mut _subscription = upstream_subject
    .clone()
    .finalize(|| println!("finalize: upstream"))
    .tap_next(|n| println!("emit (source): {n:?}"))
    .concat_map(
        move |next| match next {
            Either::Left => l.clone(),
            Either::Right => r.clone(),
        },
        Never::map_into(),
    )
    .finalize(|| println!("finalize: downstream"))
    .subscribe(PrintObserver::new("concat_map"));

upstream_subject.next(Either::Left);
inner_left_subject.next(1);
inner_right_subject.next(2);
inner_left_subject.next(3);
inner_right_subject.next(4);
upstream_subject.next(Either::Right);
inner_left_subject.next(5);
inner_right_subject.next(6);
inner_left_subject.next(7);
inner_right_subject.next(8);
inner_left_subject.complete();
inner_left_subject.next(9);
inner_right_subject.next(10);
inner_right_subject.complete();
upstream_subject.complete();

Output:

emit (source): Left
concat_map - next: 1
concat_map - next: 3
emit (source): Right
concat_map - next: 5
concat_map - next: 7
concat_map - next: 10
concat_map - completed
finalize: downstream
finalize: upstream
concat_map - unsubscribed

operator_count

crates.io ci codecov license

The count operator counts upstream emissions and emits the total once upstream completes.

See Also

  • IsEmptyOperator - Emit a single boolean indicating if the source emitted anything before it had completed.
  • FilterOperator - Keep values that satisfy a predicate.
  • ReduceOperator - Fold values and emit only the final accumulator on completion.

Example

cargo run -p rx_core --example operator_count_example
let _subscription = (1..=6)
  .into_observable()
  .filter(|value, _index| value % 2 == 0)
  .count()
  .subscribe(PrintObserver::new("count_operator"));

Output:

count_operator - next: 3
count_operator - completed
count_operator - unsubscribed

operator_debounce_time

crates.io ci codecov license

The debounce_time operator emits the most recent upstream value only after the specified duration passes without another emission.

Upstream completion and cancellation can happen instantly if there are no pending debounced values, otherwise it will complete or cancel once the pending debounced value has been emitted.

Upstream errors are immediately propagated downstream, cancelling any pending debounced value.

See Also

Example

cargo run -p rx_core --example operator_debounce_time_example
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();

let mut subject = PublishSubject::<usize>::default();

let _subscription = subject
    .clone()
    .debounce_time(Duration::from_millis(1000), scheduler)
    .subscribe(PrintObserver::new("debounce_time_operator"));

subject.next(1);
executor.tick(Duration::from_millis(500));
subject.next(2);
executor.tick(Duration::from_millis(1000));
subject.complete();

Output:

Ticking... (500ms)
Ticking... (1s)
debounce_time_operator - next: 2
debounce_time_operator - completed
debounce_time_operator - unsubscribed

operator_delay

crates.io ci codecov license

The delay operator shifts upstream values forward in time by a specified duration.

Upstream completion and cancellation can happen instantly if there are no pending delayed values, otherwise it will complete or cancel once all delayed values have been emitted.

Upstream errors are immediately propagated downstream, cancelling any pending delayed values.

See Also

Example

cargo run -p rx_core --example operator_delay_example
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();
let _subscription = (1..=3)
    .into_observable()
    .delay(Duration::from_millis(1000), scheduler)
    .subscribe(PrintObserver::new("delay_operator"));
executor.tick(Duration::from_millis(1000));

Output:

Ticking... (1s)
delay_operator - next: 1
delay_operator - next: 2
delay_operator - next: 3
delay_operator - completed
delay_operator - unsubscribed

operator_dematerialize

crates.io ci codecov license

Convert notifications back into real signals.

See Also

Example

cargo run -p rx_core --example operator_dematerialize_example
let _subscription = [
  ObserverNotification::<_, Never>::Next(1),
  ObserverNotification::Complete,
]
.into_observable()
.dematerialize()
.subscribe(PrintObserver::new("dematerialize_operator"));

Output:

dematerialize_operator - next: 1
dematerialize_operator - completed
dematerialize_operator - unsubscribed

operator_element_at

crates.io ci codecov license

Emit the value at the given index then complete.

If the element at the specified index does not exist, because it had completed before reaching that index, the operator will either error with [ElementAtOperatorError::IndexOutOfRange] or emit a default value if one was provided.

See Also

Example

cargo run -p rx_core --example operator_element_at_example
let _subscription = vec!["a", "b", "c", "d", "e"]
    .into_observable()
    .element_at(2)
    .subscribe(PrintObserver::new("element_at_operator"));

Output:

element_at_operator - next: "c"
element_at_operator - completed
element_at_operator - unsubscribed

operator_end_with

crates.io ci codecov license

Emit a value on completion.

See Also

Example

cargo run -p rx_core --example operator_end_with_example
let _subscription = (1..=5)
    .into_observable()
    .end_with(99)
    .subscribe(PrintObserver::new("end_with_operator"));

Output:

end_with_operator - next: 1
end_with_operator - next: 2
end_with_operator - next: 3
end_with_operator - next: 4
end_with_operator - next: 5
end_with_operator - next: 99
end_with_operator - completed
end_with_operator - unsubscribed

operator_enumerate

crates.io ci codecov license

Attach a running index to each emission.

See Also

Example

cargo run -p rx_core --example operator_enumerate_example
let _subscription = (10..=15)
    .into_observable()
    .enumerate()
    .subscribe(PrintObserver::new("enumerate_operator"));

Output:

enumerate_operator - next: (10, 0)
enumerate_operator - next: (11, 1)
enumerate_operator - next: (12, 2)
enumerate_operator - next: (13, 3)
enumerate_operator - next: (14, 4)
enumerate_operator - next: (15, 5)
enumerate_operator - completed
enumerate_operator - unsubscribed

operator_error_boundary

crates.io ci codecov license

Enforce Never as the error type to guard pipelines at compile time.

See Also

Example

cargo run -p rx_core --example operator_error_boundary_example
use rx_core::prelude::*;

/// The [IdentityOperator] does nothing. The only purpose it has
/// is to define inputs for a [CompositeOperator]: an [Operator] that made out
/// of other [Operator]s without having to use a [Pipe] which would require a
/// source [Observable]
fn main() {
  let _s = (1..=5)
    .into_observable()
    .map(|i| i * 2)
    .error_boundary()
    .subscribe(PrintObserver::new("error_boundary_operator (composite)"));

  // This cannot compile as relative to the `error_boundary` operator,
  // upstreams error type is not `Never`
  // let _s2 = throw("error".to_string())
  //     .map(|i| i)
  //     .error_boundary()
  //     .subscribe(PrintObserver::new("error_boundary_operator (composite)"));

  let _s3 = throw("error".to_string())
    .map(|i| i)
    .into_result()
    .error_boundary()
    .subscribe(PrintObserver::new("error_boundary_operator (composite)"));
}
error_boundary_operator (composite) - next: 2
error_boundary_operator (composite) - next: 4
error_boundary_operator (composite) - next: 6
error_boundary_operator (composite) - next: 8
error_boundary_operator (composite) - next: 10
error_boundary_operator (composite) - completed
error_boundary_operator (composite) - unsubscribed
error_boundary_operator (composite) - next: Err("error")
error_boundary_operator (composite) - unsubscribed

operator_exhaust_all

crates.io ci codecov license

Example

cargo run -p rx_core --example operator_exhaust_all_example

operator_exhaust_map

crates.io ci codecov license

Map each value to an inner observable and ignore new ones while one is active.

See Also

  • ConcatAllOperator - Subscribes to upstream observables one at a time in order.
  • MergeAllOperator - Subscribes to upstream observables and merges their emissions concurrently.
  • SwitchAllOperator - Switch to the newest inner observable, unsubscribing previous ones.
  • ExhaustAllOperator - Ignore new inner observables while one is active.
  • ConcatMapOperator - Map each value to an inner observable and subscribe to them one at a time in order.
  • MergeMapOperator - Map each value to an inner observable and merge their emissions concurrently.
  • SwitchMapOperator - Map each value to an inner observable and switch to the latest, unsubscribing previous ones.

Example

cargo run -p rx_core --example operator_exhaust_map_example
use std::time::Duration;

use rx_core::prelude::*;
use rx_core_testing::MockExecutor;

fn main() {
  let mut executor = MockExecutor::new_with_logging();
  let scheduler = executor.get_scheduler_handle();

  let mut source = PublishSubject::<i32>::default();

  let mut subscription = source
    .clone()
    .exhaust_map(
      move |next| {
        println!("Trying to switch to the {}. inner observable..", next);
        interval(
          IntervalObservableOptions {
            duration: Duration::from_millis(1000),
            max_emissions_per_tick: 10,
            start_on_subscribe: false,
          },
          scheduler.clone(),
        )
        .take(3)
      },
      Never::map_into(),
    )
    .subscribe(PrintObserver::new("exhaust_map"));

  source.next(1);
  executor.tick(Duration::from_millis(1000));
  executor.tick(Duration::from_millis(1000));
  source.next(2); // Ignored while an inner observable is active
  executor.tick(Duration::from_millis(1000));
  source.next(3); // Switches after the previous inner completes
  source.next(4); // Ignored because the new inner just started
  executor.tick(Duration::from_millis(1000));
  executor.tick(Duration::from_millis(1000));
  source.complete();
  executor.tick(Duration::from_millis(1000));

  source.unsubscribe();

  println!("end");

  subscription.unsubscribe();
}

Output:

Trying to switch to the 1. inner observable..
Ticking... (1s)
exhaust_map - next: 0
Ticking... (1s)
exhaust_map - next: 1
Trying to switch to the 2. inner observable..
Ticking... (1s)
exhaust_map - next: 2
Trying to switch to the 3. inner observable..
Trying to switch to the 4. inner observable..
Ticking... (1s)
exhaust_map - next: 0
Ticking... (1s)
exhaust_map - next: 1
Ticking... (1s)
exhaust_map - next: 2
exhaust_map - completed
exhaust_map - unsubscribed
end

operator_fallback_when_silent

crates.io ci codecov license

Emit a fallback value on ticks where the source stayed silent.

See Also

Example

cargo run -p rx_core --example operator_fallback_when_silent_example
use std::time::Duration;

use rx_core::prelude::*;
use rx_core_testing::MockExecutor;

fn main() {
  let mut executor = MockExecutor::default();
  let scheduler = executor.get_scheduler_handle();

  let mut subject = PublishSubject::<i32>::default();

  let mut subscription = subject
    .clone()
    .fallback_when_silent(|_, _, _| Default::default(), scheduler)
    .subscribe(PrintObserver::<i32>::new("fallback_when_silent"));

  subject.next(1);
  executor.tick(Duration::from_millis(200));
  subject.next(2);
  executor.tick(Duration::from_millis(200));
  // Silence
  executor.tick(Duration::from_millis(200));
  subject.next(3);
  executor.tick(Duration::from_millis(200));

  subscription.unsubscribe();
}
fallback_when_silent - next: 1
fallback_when_silent - next: 2
fallback_when_silent - next: 0
fallback_when_silent - next: 3
fallback_when_silent - unsubscribed

operator_filter

crates.io ci codecov license

Keep values that satisfy a predicate.

See Also

Example

cargo run -p rx_core --example operator_filter_example
let _subscription = (1..=5)
    .into_observable()
    .map(|next: i32| next + 1)
    .filter(|i, _| i > &2)
    .subscribe(PrintObserver::new("filter_operator"));

Output:

filter_operator - next: 3
filter_operator - next: 4
filter_operator - next: 5
filter_operator - next: 6
filter_operator - completed
filter_operator - unsubscribed

operator_filter_map

crates.io ci codecov license

Map values to an Option and keep only the Some values.

See Also

Example

cargo run -p rx_core --example operator_filter_map_example
let _subscription = (1..=5)
    .into_observable()
    .filter_map(|i| if i % 2 == 0 { Some(i) } else { None })
    .subscribe(PrintObserver::new("filter_map_operator"));

Output:

filter_map_operator - next: 2
filter_map_operator - next: 4
filter_map_operator - completed
filter_map_operator - unsubscribed

operator_finalize

crates.io ci codecov license

Execute cleanup when the observable finishes or unsubscribes.

See Also

  • TapOperator - Mirror all signals into another observer.
  • TapNextOperator - Run a callback for each next value while letting signals pass through.
  • OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
  • OnSubscribeOperator - Run a callback when a subscription is established.

Example

cargo run -p rx_core --example operator_finalize_completion_example
let _subscription = just(12)
    .finalize(|| println!("finally!"))
    .subscribe(PrintObserver::new("finalize_operator"));

Output:

finalize_operator - next: 12
finalize_operator - completed
finally!
finalize_operator - unsubscribed

operator_find

crates.io ci codecov license

Emit the first value that matches a predicate.

See Also

Example

cargo run -p rx_core --example operator_find_example
let _subscription = (1..=5)
    .into_observable()
    .find(|i| i % 2 == 0)
    .subscribe(PrintObserver::new("find_operator"));

Output:

find_operator - next: 2
find_operator - completed
find_operator - unsubscribed

operator_find_index

crates.io ci codecov license

Emit the index of the first value that matches a predicate.

See Also

Example

cargo run -p rx_core --example operator_find_index_example
let _subscription = (1..=5)
    .into_observable()
    .find_index(|i| i % 2 == 0)
    .subscribe(PrintObserver::new("find_index_operator"));

Output:

find_index_operator - next: 1
find_index_operator - completed
find_index_operator - unsubscribed

operator_first

crates.io ci codecov license

Emit only the first value, then complete.

See Also

Example

cargo run -p rx_core --example operator_first_example
let _subscription = (1..=5)
    .into_observable()
    .first()
    .subscribe(PrintObserver::new("first_operator"));

Output:

first_operator - next: 1
first_operator - completed
first_operator - unsubscribed

operator_identity

crates.io ci codecov license

The identity operator is a no-op operator.

In layman’s terms: speedy thing goes in, speedy thing comes out.

It is used to conveniently define the input types of a composite operator. This is why only this operator has a standalone compose_operator function and has no Observable extension methods.

See Also

Example

cargo run -p rx_core --example operator_identity_example
// If it would exist, this would be the same as: `just(1).identity().subscribe(...)`
let _s = IdentityOperator::default()
    .operate(just(1))
    .subscribe(PrintObserver::new("identity_operator"));
identity_operator - next: 1
identity_operator - completed
identity_operator - unsubscribed

Example (Composite)

cargo run -p rx_core --example operator_identity_composite_example
let composite_operator = compose_operator::<i32, Never>()
    .map(|i| i + 1)
    .filter(|i, _| i < &4);

let _s = (1..=5)
    .into_observable()
    .pipe(composite_operator)
    .subscribe(PrintObserver::new("identity_operator (composite)"));
identity_operator (composite) - next: 2
identity_operator (composite) - next: 3
identity_operator (composite) - completed
identity_operator (composite) - unsubscribed

operator_into_result

crates.io ci codecov license

Error handling operator. Captures upstream values and errors, and forwards them downstream as a Result.

See Also

Example

cargo run -p rx_core --example operator_into_result_example
let _s = throw("error!".to_string())
    .into_result()
    .subscribe(PrintObserver::new("into_result_operator - throw"));

let _s = just(1)
    .into_result()
    .subscribe(PrintObserver::new("into_result_operator - just"));
into_result_operator - throw - next: Err("error!")
into_result_operator - throw - unsubscribed
into_result_operator - just - next: Ok(1)
into_result_operator - just - completed
into_result_operator - just - unsubscribed

operator_is_empty

crates.io ci codecov license

The is_empty operator will emit a single boolean value indicating whether upstream emitted any items before completing:

  • If the upstream completes without emitting any items, is_empty will emit true and then complete.
  • If the upstream emits any items, is_empty will immediately emit false and complete.

Example

cargo run -p rx_core --example operator_is_empty_example
let _s = (1..=5)
    .into_observable()
    .is_empty() // Will stop the iterator, send `false`, then complete.
    .subscribe(PrintObserver::new("is_empty_operator - iterator"));

let _s = empty() // Immediately completes.
    .is_empty()
    .subscribe(PrintObserver::new("is_empty_operator - empty"));

Output:

is_empty_operator - iterator - next: false
is_empty_operator - iterator - completed
is_empty_operator - iterator - unsubscribed
is_empty_operator - empty - next: true
is_empty_operator - empty - completed
is_empty_operator - empty - unsubscribed

operator_lift_option

crates.io ci codecov license

Filter out None and forward Some values.

See Also

Example

cargo run -p rx_core --example operator_lift_option_example
let _subscription = (1..=5)
    .into_observable()
    .map(|i| if i % 2 == 0 { Some(i) } else { None })
    .lift_option()
    .subscribe(PrintObserver::new("lift_option_operator"));

Output:

lift_option_operator - next: 2
lift_option_operator - next: 4
lift_option_operator - completed
lift_option_operator - unsubscribed

operator_lift_result

crates.io ci codecov license

Split Result values into next and error signals.

See Also

Example

cargo run -p rx_core --example operator_lift_result_example
use rx_core::prelude::*;

fn main() {
  let _s = (1..=5)
    .into_observable()
    .map(|i| {
      if i <= 3 {
        Result::<i32, String>::Ok(i)
      } else {
        Result::<i32, String>::Err("Larger than 3!".to_string())
      }
    })
    // We're lifting the result error from the "next" channel, but we still have to deal with
    // upstream errors if they exist, this `unreachable!` is just here to ignore them.
    .lift_result()
    .subscribe(PrintObserver::new("lift_result_operator"));
}
lift_result_operator - next: 1
lift_result_operator - next: 2
lift_result_operator - next: 3
lift_result_operator - error: "Larger than 3!"
lift_result_operator - unsubscribed

operator_map

crates.io ci codecov license

Transform each value with a mapping function.

See Also

Example

cargo run -p rx_core --example operator_map_example
let _subscription = (1..=5)
    .into_observable()
    .map(|i| i * 2)
    .subscribe(PrintObserver::new("map_operator"));

Output:

map_operator - next: 2
map_operator - next: 4
map_operator - next: 6
map_operator - next: 8
map_operator - next: 10
map_operator - completed
map_operator - unsubscribed

operator_map_error

crates.io ci codecov license

Transform error values into another error value.

See Also

Example

cargo run -p rx_core --example operator_map_error_example
let _subscription = concat((
  (1..=5)
    .into_observable()
    .map_error(Never::map_into::<&'static str>()),
  throw("error").map(Never::map_into::<usize>()),
))
.skip(1)
.map_error(|error| format!("error? {error}"))
.subscribe(PrintObserver::new("map_error_operator"));

Output:

map_error_operator - next: 1
map_error_operator - next: 2
map_error_operator - next: 3
map_error_operator - next: 4
map_error_operator - next: 5
map_error_operator - error: "error? error"
map_error_operator - unsubscribed

operator_map_into

crates.io ci codecov license

Map each value using Into.

See Also

Example

cargo run -p rx_core --example operator_map_into_example
#[derive(Debug)]
pub struct Foo(pub i32);

impl From<i32> for Foo {
  fn from(value: i32) -> Self {
    Foo(value)
  }
}

let _subscription = (1..=5)
    .into_observable()
    .map_into()
    .subscribe(PrintObserver::<Foo>::new("into_operator"));

Output:

into_operator - next: Foo(1)
into_operator - next: Foo(2)
into_operator - next: Foo(3)
into_operator - next: Foo(4)
into_operator - next: Foo(5)
into_operator - completed
into_operator - unsubscribed

operator_map_never

crates.io ci codecov license

Re-type Never next/error channels into concrete types.

See Also

Example

cargo run -p rx_core --example operator_map_never_example
let _subscription_error = throw("error")
    .map_never()
    .subscribe(PrintObserver::<i32, &'static str>::new("map_never (next)"));

let _subscription_next = just(1)
    .map_never()
    .subscribe(PrintObserver::<i32, &'static str>::new("map_never (error)"));

let _subscription_both = empty()
    .map_never_both()
    .subscribe(PrintObserver::<i32, &'static str>::new("map_never_both"));

Output:

map_never (next) - error: "error"
map_never (next) - unsubscribed
map_never (error) - next: 1
map_never (error) - completed
map_never (error) - unsubscribed
map_never_both - completed
map_never_both - unsubscribed

operator_materialize

crates.io ci codecov license

Turn next/error/complete into notification values.

See Also

Example

cargo run -p rx_core --example operator_materialize_example
let _subscription = (1..=5)
    .into_observable()
    .materialize()
    .subscribe(PrintObserver::new("materialize_operator"));

Output:

materialize_operator - next: Next(1)
materialize_operator - next: Next(2)
materialize_operator - next: Next(3)
materialize_operator - next: Next(4)
materialize_operator - next: Next(5)
materialize_operator - next: Complete
materialize_operator - completed
materialize_operator - unsubscribed

operator_merge_all

crates.io ci codecov license

Example

cargo run -p rx_core --example operator_merge_all_example
#[derive(Clone, Debug)]
enum Either {
    Left,
    Right,
}

let mut upstream_subject = PublishSubject::<Either>::default();
let mut inner_left_subject = PublishSubject::<i32>::default();
let mut inner_right_subject = PublishSubject::<i32>::default();

let l = inner_left_subject.clone();
let r = inner_right_subject.clone();
let mut _subscription = upstream_subject
    .clone()
    .finalize(|| println!("finalize: upstream"))
    .tap_next(|n| println!("emit (source): {n:?}"))
    .map(move |next| match next {
        Either::Left => l.clone(),
        Either::Right => r.clone(),
    })
    .merge_all(usize::MAX, Never::map_into())
    .finalize(|| println!("finalize: downstream"))
    .subscribe(PrintObserver::new("merge_map"));

upstream_subject.next(Either::Left);
inner_left_subject.next(1);
inner_right_subject.next(2);
inner_left_subject.next(3);
inner_right_subject.next(4);
upstream_subject.next(Either::Right);
inner_left_subject.next(5);
inner_right_subject.next(6);
inner_left_subject.next(7);
inner_right_subject.next(8);
inner_left_subject.complete();
inner_left_subject.next(9);
inner_right_subject.next(10);
inner_right_subject.complete();
upstream_subject.complete();

Output:

emit (source): Left
merge_map - next: 1
merge_map - next: 3
emit (source): Right
merge_map - next: 5
merge_map - next: 6
merge_map - next: 7
merge_map - next: 8
merge_map - next: 10
merge_map - completed
finalize: downstream
finalize: upstream
merge_map - unsubscribed

operator_merge_map

crates.io ci codecov license

Map each value to an inner observable and merge their emissions concurrently.

See Also

  • ConcatAllOperator - Subscribes to upstream observables one at a time in order.
  • MergeAllOperator - Subscribes to upstream observables and merges their emissions concurrently.
  • SwitchAllOperator - Switch to the newest inner observable, unsubscribing previous ones.
  • ExhaustAllOperator - Ignore new inner observables while one is active.
  • ConcatMapOperator - Map each value to an inner observable and subscribe to them one at a time in order.
  • SwitchMapOperator - Map each value to an inner observable and switch to the latest, unsubscribing previous ones.
  • ExhaustMapOperator - Map each value to an inner observable and ignore new ones while one is active.

Example

cargo run -p rx_core --example operator_merge_map_example
#[derive(Clone, Debug)]
enum Either {
    Left,
    Right,
}

let mut upstream_subject = PublishSubject::<Either>::default();
let mut inner_left_subject = PublishSubject::<i32>::default();
let mut inner_right_subject = PublishSubject::<i32>::default();

let l = inner_left_subject.clone();
let r = inner_right_subject.clone();
let mut _subscription = upstream_subject
    .clone()
    .finalize(|| println!("finalize: upstream"))
    .tap_next(|n| println!("emit (source): {n:?}"))
    .merge_map(
        move |next| match next {
            Either::Left => l.clone(),
            Either::Right => r.clone(),
        },
        usize::MAX,
        Never::map_into(),
    )
    .finalize(|| println!("finalize: downstream"))
    .subscribe(PrintObserver::new("merge_map"));

upstream_subject.next(Either::Left);
inner_left_subject.next(1);
inner_right_subject.next(2);
inner_left_subject.next(3);
inner_right_subject.next(4);
upstream_subject.next(Either::Right);
inner_left_subject.next(5);
inner_right_subject.next(6);
inner_left_subject.next(7);
inner_right_subject.next(8);
inner_left_subject.complete();
inner_left_subject.next(9);
inner_right_subject.next(10);
inner_right_subject.complete();
upstream_subject.complete();

Output:

emit (source): Left
merge_map - next: 1
merge_map - next: 3
emit (source): Right
merge_map - next: 5
merge_map - next: 6
merge_map - next: 7
merge_map - next: 8
merge_map - next: 10
merge_map - completed
finalize: downstream
finalize: upstream
merge_map - unsubscribed

operator_observe_on

crates.io ci codecov license

The observe_on operator re-emits upstream next signals on the provided scheduler.

Upstream completion and cancellation happen immediately when there are no pending scheduled values, otherwise they are deferred until scheduled work drains.

Upstream errors are forwarded immediately; any pending scheduled values are skipped because downstream closes.

See Also

Example

cargo run -p rx_core --example operator_observe_on_example
use std::time::Duration;

use rx_core::prelude::*;
use rx_core_testing::MockExecutor;

fn main() {
    let mut executor = MockExecutor::new_with_logging();
    let scheduler = executor.get_scheduler_handle();

    let _subscription = (1..=3)
        .into_observable()
        .observe_on(scheduler)
        .subscribe(PrintObserver::new("observe_on_operator"));

    executor.tick(Duration::from_millis(1));
}

Output:

Ticking... (0ns)
observe_on_operator - next: 1
observe_on_operator - next: 2
observe_on_operator - next: 3
observe_on_operator - completed
observe_on_operator - unsubscribed

operator_subscribe_on

crates.io ci codecov license

The subscribe_on operator schedules the subscription to the upstream observable on the provided scheduler.

This only affects when the upstream subscription starts. It does not alter when upstream next, error, or complete signals are emitted.

The subscription can be delayed with subscribe_on_with_delay.

See Also

Example

cargo run -p rx_core --example operator_subscribe_on_example
use std::time::Duration;

use rx_core::prelude::*;
use rx_core_testing::MockExecutor;

fn main() {
    let mut executor = MockExecutor::new_with_logging();
    let scheduler = executor.get_scheduler_handle();

    let _subscription = (1..=3)
        .into_observable()
      .subscribe_on(scheduler)
        .subscribe(PrintObserver::new("subscribe_on_operator"));

    executor.tick(Duration::from_millis(0));
}

Output:

Ticking... (0ns)
subscribe_on_operator - next: 1
subscribe_on_operator - next: 2
subscribe_on_operator - next: 3
subscribe_on_operator - completed
subscribe_on_operator - unsubscribed

operator_throttle_time

crates.io ci codecov license

The throttle_time operator limits the frequency of downstream emissions by emitting an upstream value, then suppressing subsequent emissions until the duration elapses.

When the output is set to LeadingOnly, the first upstream value in a throttle window is emitted immediately. When the output is set to TrailingOnly, the most recent upstream value observed during the throttle window is emitted when it ends. The default LeadingAndTrailing setting emits both the first and the most recent values in each throttle window.

Upstream completion and cancellation can happen instantly if there is no pending trailing value, otherwise it will complete or cancel once the trailing value has been emitted.

Upstream errors are immediately propagated downstream, cancelling any pending throttled value.

Options

Use [ThrottleTimeOptions] to configure duration and output behavior.

  • duration: The throttle window duration. Default: 1s.
  • output: Controls which emissions are produced in each throttle window. Default: ThrottleOutputBehavior::LeadingAndTrailing. Possible values: ThrottleOutputBehavior::LeadingOnly, ThrottleOutputBehavior::TrailingOnly, ThrottleOutputBehavior::LeadingAndTrailing.

See Also

Example

cargo run -p rx_core --example operator_throttle_time_example
let mut executor = MockExecutor::new_with_logging();
let scheduler = executor.get_scheduler_handle();

let mut subject = PublishSubject::<usize>::default();

let _subscription = interval(
    IntervalObservableOptions {
        duration: Duration::from_millis(1),
        max_emissions_per_tick: 1000,
        ..Default::default()
    },
    scheduler.clone(),
)
.throttle_time(
  ThrottleTimeOptions::new(Duration::from_millis(500)),
  scheduler,
)
.subscribe(PrintObserver::new("throttle_time_operator"));

for _ in 0..10 {
    executor.tick(Duration::from_millis(100));
}

Output:

Ticking... (100ms)
throttle_time_operator - next: 0
Ticking... (100ms)
Ticking... (100ms)
Ticking... (100ms)
Ticking... (100ms)
throttle_time_operator - next: 499
Ticking... (100ms)
Ticking... (100ms)
Ticking... (100ms)
Ticking... (100ms)
Ticking... (100ms)
throttle_time_operator - next: 999
throttle_time_operator - unsubscribed

operator_on_next

crates.io ci codecov license

Invoke a callback for each value that can also decide whether to forward it.

  • Returning true allows the value to be forwarded to the destination observer.
  • Returning false prevents the value from being forwarded.

Like filter, but with access to the destination observer!

See Also

Example

cargo run -p rx_core --example operator_on_next_example
let _subscription = (1..=5)
    .into_observable()
    .on_next(|next, destination| {
        destination.next(next * 99);
        true
    })
    .subscribe(PrintObserver::new("on_next_operator"));

Output:

on_next_operator - next: 99
on_next_operator - next: 1
on_next_operator - next: 198
on_next_operator - next: 2
on_next_operator - next: 297
on_next_operator - next: 3
on_next_operator - next: 396
on_next_operator - next: 4
on_next_operator - next: 495
on_next_operator - next: 5
on_next_operator - completed
on_next_operator - unsubscribed

operator_on_subscribe

crates.io ci codecov license

Run a callback when a subscription is established.

See Also

  • TapOperator - Mirror all signals into another observer.
  • TapNextOperator - Run a callback for each next value while letting signals pass through.
  • OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
  • FinalizeOperator - Execute cleanup when the observable finishes or unsubscribes.

Example

cargo run -p rx_core --example operator_on_subscribe_example
let _subscription = (1..=5)
    .into_observable()
    .on_subscribe(|destination| destination.next(99))
    .subscribe(PrintObserver::new("on_subscribe_operator"));

Output:

on_subscribe_operator - next: 99
on_subscribe_operator - next: 1
on_subscribe_operator - next: 2
on_subscribe_operator - next: 3
on_subscribe_operator - next: 4
on_subscribe_operator - next: 5
on_subscribe_operator - completed
on_subscribe_operator - unsubscribed

operator_pairwise

crates.io ci codecov license

Emit the previous and current values together.

See Also

Example

cargo run -p rx_core --example operator_pairwise_example
let _subscription = (1..=4)
    .into_observable()
    .pairwise()
    .subscribe(PrintObserver::new("pairwise_operator"));

Output:

pairwise_operator - next: [1, 2]
pairwise_operator - next: [2, 3]
pairwise_operator - next: [3, 4]
pairwise_operator - completed
pairwise_operator - unsubscribed

operator_reduce

crates.io ci codecov license

Fold values and emit only the final accumulator on completion.

See Also

  • ScanOperator - Accumulate state and emit every intermediate result.

Example

cargo run -p rx_core --example operator_reduce_example
let _subscription = (0..=10)
    .into_observable()
    .reduce(|acc, next| acc + next, 0)
    .subscribe(PrintObserver::new("reduce_operator"));

Output:

reduce_operator - next: 55
reduce_operator - completed
reduce_operator - unsubscribed

operator_retry

crates.io ci codecov license

Book Page - Operator Source - Subscriber Source

Resubscribe on error up to the configured retry count.

See Also

Example

Run the example with:

cargo run -p rx_core --example operator_retry_example
let mut retried = concat((
    (0..=2).into_observable().map_never(),
    throw("error").map_never(),
))
.retry(2);

let _s1 = retried.subscribe(PrintObserver::new("retry_operator"));

Output:

retry_operator - next: 0
retry_operator - next: 1
retry_operator - next: 2
retry_operator - next: 0
retry_operator - next: 1
retry_operator - next: 2
retry_operator - next: 0
retry_operator - next: 1
retry_operator - next: 2
retry_operator - error: "error"
retry_operator - unsubscribed

operator_scan

crates.io ci codecov license

Accumulate state and emit every intermediate result.

See Also

  • ReduceOperator - Fold values and emit only the final accumulator on completion.

Example

cargo run -p rx_core --example operator_scan_example
let _subscription = (0..=10)
    .into_observable()
    .scan(|acc, next| acc + next, 0)
    .subscribe(PrintObserver::new("scan_operator"));

Output:

scan_operator - next: 0
scan_operator - next: 1
scan_operator - next: 3
scan_operator - next: 6
scan_operator - next: 10
scan_operator - next: 15
scan_operator - next: 21
scan_operator - next: 28
scan_operator - next: 36
scan_operator - next: 45
scan_operator - next: 55
scan_operator - completed

operator_share

crates.io ci codecov license

Book Page - Operator Source - Subscriber Source

Multicast a source through a connector so downstream subscribers share one upstream subscription.

See Also

  • ConnectableObservable - Maintains an internal connector subject, that can subscribe to a source observable only when the connect function is called on it. Subscribers of will subscribe to this internal connector.

Example

cargo run -p rx_core --example operator_share_example
use std::time::Duration;

use rx_core::prelude::*;
use rx_core_testing::MockExecutor;

fn main() {
  let mut executor = MockExecutor::new_with_logging();
  let scheduler = executor.get_scheduler_handle();
  let shared_interval = interval(
    IntervalObservableOptions {
      duration: Duration::from_secs(1),
      max_emissions_per_tick: 10,
      ..Default::default()
    },
    scheduler,
  )
  .finalize(|| println!("shared interval: unsubscribed"))
  .tap_next(|n| println!("shared interval next: {n}"))
  .share::<ProvideWithDefault<PublishSubject<_, _>>>(ConnectableOptions::default());

  // No subscriptions yet, these will not advance the interval as there isn't one
  executor.tick(Duration::from_secs(7));

  let _s1 = shared_interval
    .clone()
    .subscribe(PrintObserver::new("share_operator_1"));

  // A subscription was established, now that share is hot, there is an active interval subscription!
  executor.tick(Duration::from_secs(4));

  let _s2 = shared_interval
    .clone()
    .subscribe(PrintObserver::new("share_operator_2"));

  // A subscription was already hot, the same interval output is received by the second subscription too
  executor.tick(Duration::from_secs(2));
}
Ticking... (7s)
Ticking... (4s)
shared interval next: 0
share_operator_1 - next: 0
shared interval next: 1
share_operator_1 - next: 1
shared interval next: 2
share_operator_1 - next: 2
shared interval next: 3
share_operator_1 - next: 3
Ticking... (2s)
shared interval next: 4
share_operator_2 - next: 4
share_operator_1 - next: 4
shared interval next: 5
share_operator_2 - next: 5
share_operator_1 - next: 5
share_operator_2 - unsubscribed
share_operator_1 - unsubscribed
shared interval: unsubscribed

operator_skip

crates.io ci codecov license

Drop the first n values.

See Also

Example

cargo run -p rx_core --example operator_skip_example
let _subscription = (1..=5)
    .into_observable()
    .skip(2)
    .subscribe(PrintObserver::new("skip_operator"));

Output:

skip_operator - next: 3
skip_operator - next: 4
skip_operator - next: 5
skip_operator - completed
skip_operator - unsubscribed

operator_start_with

crates.io ci codecov license

Emit a value first when subscribing to the source.

See Also

Example

cargo run -p rx_core --example operator_start_with_example
let _subscription = (1..=5)
    .into_observable()
    .start_with(99)
    .subscribe(PrintObserver::new("start_with_operator"));

Output:

start_with_operator - next: 99
start_with_operator - next: 1
start_with_operator - next: 2
start_with_operator - next: 3
start_with_operator - next: 4
start_with_operator - next: 5
start_with_operator - completed
start_with_operator - unsubscribed

operator_switch_all

crates.io ci codecov license

Example

cargo run -p rx_core --example operator_switch_all_example

operator_switch_map

crates.io ci codecov license

Map each value to an inner observable and switch to the latest, unsubscribing previous ones.

See Also

  • ConcatAllOperator - Subscribes to upstream observables one at a time in order.
  • MergeAllOperator - Subscribes to upstream observables and merges their emissions concurrently.
  • SwitchAllOperator - Switch to the newest inner observable, unsubscribing previous ones.
  • ExhaustAllOperator - Ignore new inner observables while one is active.
  • ConcatMapOperator - Map each value to an inner observable and subscribe to them one at a time in order.
  • MergeMapOperator - Map each value to an inner observable and merge their emissions concurrently.
  • ExhaustMapOperator - Map each value to an inner observable and ignore new ones while one is active.

Example

cargo run -p rx_core --example operator_switch_map_operator_example
let _subscription = (1..=3)
    .into_observable()
    .switch_map(|next| IteratorObservable::new(1..=next), Never::map_into())
    .subscribe(PrintObserver::new("switch_map"));

Output:

switch_map - next: 1
switch_map - next: 1
switch_map - next: 2
switch_map - next: 1
switch_map - next: 2
switch_map - next: 3
switch_map - completed
switch_map - unsubscribed

operator_take

crates.io ci codecov license

Emit only the first n values, then complete.

See Also

Example

cargo run -p rx_core --example operator_take_example
let _subscription = (1..=5)
    .into_observable()
    .take(2)
    .subscribe(PrintObserver::new("take_operator"));

Output:

take_operator - next: 1
take_operator - next: 2
take_operator - completed
take_operator - unsubscribed

operator_tap

crates.io ci codecov license

Book Page - Operator Source - Subscriber Source

The tap operator lets you forward upstream values to a destination observer.

The destination could be anything, a PrintObserver to log upstream values to the console, or even a subject to trigger another pipeline.

Good to know

  • Keep in mind that the destination observer passed in will not get upgraded1, meaning a tap operator will never call unsubscribe on the destination, even if it’s a subscriber that upgrades to itself and has an unsubscribe implementation. However, the error and complete signals are forwarded. If you want to avoid forwarding error and complete, use tap_next instead.

See Also

  • TapNextOperator - Run a callback for each next value while letting signals pass through.
  • OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
  • OnSubscribeOperator - Run a callback when a subscription is established.
  • FinalizeOperator - Execute cleanup when the observable finishes or unsubscribes.

Example

cargo run -p rx_core --example operator_tap_example
(1..=3)
    .into_observable()
    .tap(PrintObserver::new("tap_destination"))
    .subscribe(PrintObserver::new("tap_operator"));

Output:

tap_destination - next: 1
tap_operator - next: 1
tap_destination - next: 2
tap_operator - next: 2
tap_destination - next: 3
tap_operator - next: 3
tap_destination - completed
tap_operator - completed
tap_operator - unsubscribed

  1. Documentation on UpgradeableObserver

operator_tap_next

crates.io ci codecov license

Run a callback for each next value while letting signals pass through.

See Also

  • TapOperator - Mirror all signals into another observer.
  • OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
  • OnSubscribeOperator - Run a callback when a subscription is established.
  • FinalizeOperator - Execute cleanup when the observable finishes or unsubscribes.

Example

cargo run -p rx_core --example operator_tap_next_example
let _subscription = (1..=5)
    .into_observable()
    .tap_next(|next| println!("hello {next}"))
    .subscribe(PrintObserver::new("tap_operator"));

Output:

hello 1
tap_operator - next: 1
hello 2
tap_operator - next: 2
hello 3
tap_operator - next: 3
hello 4
tap_operator - next: 4
hello 5
tap_operator - next: 5
tap_operator - completed
tap_operator - unsubscribed

operator_with_latest_from

crates.io ci codecov license

Example

cargo run -p rx_core --example operator_with_latest_from_example
let mut source = PublishSubject::<usize, &'static str>::default();
let mut inner = PublishSubject::<&'static str, &'static str>::default();

let _subscription = source
    .clone()
    .with_latest_from(inner.clone())
    .subscribe(PrintObserver::new("with_latest_from_operator"));

source.next(1);
inner.next("hello");
source.next(2);
source.next(3);
source.next(4);
inner.next("bello");
source.next(5);
inner.error("error");

Output:

with_latest_from_operator - next: (2, "hello")
with_latest_from_operator - next: (3, "hello")
with_latest_from_operator - next: (4, "hello")
with_latest_from_operator - next: (5, "bello")
with_latest_from_operator - error: "error"
with_latest_from_operator - unsubscribed

Schedulers

scheduler_ticking

crates.io ci codecov license

Subjects

subject_async

crates.io ci codecov license

Reduces observed values into one and emits it to active subscribers once completed. Once completed, it also replays the result to late subscribers.

See Also

  • PublishSubject - Forwards observed signals to active subscribers without replaying values, but terminal state is replayed.
  • BehaviorSubject - Always holds a value that is replayed to late subscribers.
  • ReplaySubject - Buffers the last N values and replays them to late subscribers.
  • ProvenanceSubject - BehaviorSubject that also stores an additional filtering value to track provenance.

Example

Run the example with:

cargo run -p rx_core --example subject_async_example
use rx_core::prelude::*;

fn main() {
    let mut subject = AsyncSubject::<i32>::default();

    let mut _subscription_1 = subject
        .clone()
        .subscribe(PrintObserver::<i32>::new("async_subject sub_1"));

    subject.next(1);
    subject.next(2);

    let mut _subscription_2 = subject
        .clone()
        .subscribe(PrintObserver::<i32>::new("async_subject sub_2"));

    subject.next(3);
    subject.complete();

    let mut _subscription_3 = subject
        .clone()
        .subscribe(PrintObserver::<i32>::new("async_subject sub_3"));
}

Output:

async_subject sub_1 - next: 3
async_subject sub_2 - next: 3
async_subject sub_1 - completed
async_subject sub_1 - unsubscribed
async_subject sub_2 - completed
async_subject sub_2 - unsubscribed
async_subject sub_3 - next: 3
async_subject sub_3 - completed
async_subject sub_3 - unsubscribed

subject_behavior

crates.io ci codecov license

Always holds a value that is replayed to late subscribers.

See Also

  • PublishSubject - Forwards observed signals to active subscribers without replaying values, but terminal state is replayed.
  • AsyncSubject - Reduces observed values into one and emits it on completion, replaying the result to late subscribers.
  • ReplaySubject - Buffers the last N values and replays them to late subscribers.
  • ProvenanceSubject - BehaviorSubject that also stores an additional filtering value to track provenance.

Example

Run the example with:

cargo run -p rx_core --example subject_behavior_example
use rx_core::prelude::*;

fn main() {
    let mut subject = BehaviorSubject::<i32>::new(10);

    let mut hello_subscription = subject
        .clone()
        .subscribe(PrintObserver::<i32>::new("hello"));

    subject.next(11);

    let _s1 = subject
        .clone()
        .map(|next| next * 2)
        .subscribe(PrintObserver::<i32>::new("hi double"));

    subject.next(12);
    hello_subscription.unsubscribe();
    subject.next(13);
    subject.complete();

    let mut _completed_subscription = subject
        .clone()
        .subscribe(PrintObserver::<i32>::new("hello_completed"));
}

Output:

hello - next: 10
hello - next: 11
hi double - next: 22
hi double - next: 24
hello - next: 12
hello - unsubscribed
hi double - next: 26
hi double - completed
hi double - unsubscribed
hello_completed - next: 13
hello_completed - completed
hello_completed - unsubscribed

subject_provenance

crates.io ci codecov license

BehaviorSubject that also stores an additional filtering value to track provenance so subscribers can filter by origin.

See Also

  • PublishSubject - Forwards observed signals to active subscribers without replaying values, but terminal state is replayed.
  • AsyncSubject - Reduces observed values into one and emits it on completion, replaying the result to late subscribers.
  • BehaviorSubject - Always holds a value that is replayed to late subscribers.
  • ReplaySubject - Buffers the last N values and replays them to late subscribers.

Example

Run the example with:

cargo run -p rx_core --example subject_provenance_example
use rx_core::prelude::*;

#[derive(PartialEq, Clone, Debug)]
enum ExampleProvenance {
    Foo,
    Bar,
}

fn main() {
    let mut subject = ProvenanceSubject::<ExampleProvenance, usize>::new(
        10,
        ExampleProvenance::Foo,
    );

    let _all_subscription = subject
        .clone()
        .all()
        .subscribe(PrintObserver::<usize>::new("provenance_ignored"));

    let _bar_subscription = subject
        .clone()
        .only_by_provenance(ExampleProvenance::Bar)
        .subscribe(PrintObserver::<usize>::new("provenance_bar"));

    let _foo_subscription = subject
        .clone()
        .only_by_provenance(ExampleProvenance::Foo)
        .subscribe(PrintObserver::<usize>::new("provenance_foo"));

    subject.next((1, ExampleProvenance::Foo));
    subject.next((2, ExampleProvenance::Bar));
    subject.next((3, ExampleProvenance::Foo));
    subject.next((4, ExampleProvenance::Bar));
}

Output:

provenance_ignored - next: 10
provenance_foo - next: 10
provenance_ignored - next: 1
provenance_foo - next: 1
provenance_bar - next: 2
provenance_ignored - next: 2
provenance_ignored - next: 3
provenance_foo - next: 3
provenance_bar - next: 4
provenance_ignored - next: 4
provenance_foo - unsubscribed
provenance_bar - unsubscribed
provenance_ignored - unsubscribed

subject_publish

crates.io ci codecov license

Forwards observed signals to all active subscribers. Does not replay values to late subscribers, but always replays terminal state.

See Also

  • AsyncSubject - Reduces observed values into one and emits it on completion, replaying the result to late subscribers.
  • BehaviorSubject - Always holds a value that is replayed to late subscribers.
  • ReplaySubject - Buffers the last N values and replays them to late subscribers.
  • ProvenanceSubject - BehaviorSubject that also stores an additional filtering value to track provenance.

Example

cargo run -p rx_core --example subject_publish_example
use rx_core::prelude::*;

fn main() {
    let mut subject = PublishSubject::<i32>::default();
    subject.next(1);

    let mut subscription = subject
        .clone()
        .subscribe(PrintObserver::<i32>::new("subject_example"));
    subject.next(2);
    subject.next(3);
    subscription.unsubscribe();
    subject.next(4);
    subject.complete();

    let _subscription_2 = subject
        .clone()
        .subscribe(PrintObserver::<i32>::new("subject_example_2"));
}

Output:

subject_example - next: 2
subject_example - next: 3
subject_example - unsubscribed
subject_example_2 - completed
subject_example_2 - unsubscribed

subject_replay

crates.io ci codecov license

Buffers the last N values and replays them to late subscribers.

See Also

  • PublishSubject - Forwards observed signals to active subscribers without replaying values, but terminal state is replayed.
  • AsyncSubject - Reduces observed values into one and emits it on completion, replaying the result to late subscribers.
  • BehaviorSubject - Always holds a value that is replayed to late subscribers.
  • ProvenanceSubject - BehaviorSubject that also stores an additional filtering value to track provenance.

Example

cargo run -p rx_core --example subject_replay_example
use rx_core::prelude::*;

fn main() {
    let mut subject = ReplaySubject::<2, i32>::default();

    let _s = subject
        .clone()
        .subscribe(PrintObserver::<i32>::new("hello"));

    subject.next(1);
    subject.next(2);
    subject.next(3);

    let _s2 = subject
        .clone()
        .subscribe(PrintObserver::<i32>::new("hi"));

    subject.next(4);
    subject.next(5);
}

Output:

hello - next: 1
hello - next: 2
hello - next: 3
hi - next: 2
hi - next: 3
hi - next: 4
hello - next: 4
hi - next: 5
hello - next: 5
hi - unsubscribed
hello - unsubscribed

Development

Writing Tests

Integration Testing & Contract Adherence

Beyond writing tests for the main purpose of an observable or operator, you must also include tests for other standard behavior, such as making sure teardowns were forwarded and executed on unsubscribe, and that resources are disposed even when downstream unsubscribed after an action.

The more complex an observable/operator is, the easier it is to accidentally break an expected standard behavior. That is why it’s important to test for all of these requirements, even if the observable/operator itself does not need to consider them in its implementation due to its simplicity.

If you develop an observable/operator outside of this repository, you do not need to follow the standard test structure this repository is using, but it is recommended. To ensure your observable/operator behaves as expected, it’s enough to follow and test for the Runtime Contracts.

Test Organization

See also: Rust Book / Test Organization

Contract testing is integration testing, and it should be done from the same “outside” perspective as the users would use your observable/operator.

  • Integration tests should be implemented in the tests folder of your crate.
  • Unit tests can be put anywhere; having them in the same file as the thing tested is preferred, as it gives you access to its private internals for assertions.

Code Coverage & Dead Code Elimination

Always start with integration testing first! This gives you an opportunity to see if your implementation has any dead code: code that doesn’t even run while still operating correctly.

Since at this point you’ve only tested your observable/operator from the outside, now you can evaluate what to do with code that was not covered by your tests depending on whether or not it’s even possible to reach it:

  • If it’s possible and is related to your logic, you’ve missed testing a feature!
  • If it’s possible and is not related to your logic but to standard behavior, and the contract tests didn’t cover it, then you found an edge case and a new contract should be added!
  • If it’s not possible to reach it, deleting it depends on whether that piece of code has any purpose when used somewhere other than your operator/observable. If it’s a subscriber exported by your crate, someone else may write another operator with it that does make that piece of code reachable. Depending on this, you may either write a unit test for it or delete the useless code.

What 100% Test Coverage Means

In general

It’s very important to recognize that 100% test coverage does not mean your project is completely bug free! It means that it does not have any dead code, and that every feature is at least partially tested!

It’s still very useful, as it gives you confidence that new changes will not break existing features, and your users can be confident that at least one test exists for any feature they may end up using.

Code coverage should be thought of as an outward-facing metric for users. A confidence score of sorts. Therefore it’s not required to include code in the coverage that will never reach a user. For example, private crates that are usually dev tools are irrelevant for the user and can safely be excluded from code coverage.

Requiring it at a CI/CD level also gives you some benefits, not just in teams but for solo projects as well:

  • It forces you to write tests for every feature, eliminating the possibility of forgetting to test something! (Again, it does not mean you have covered all edge cases, only that it’s at least tested!)

Integration Tests

Each test file should organize tests in modules in a BDD fashion, where each module describes a scenario like when_option_is_false or when_already_unsubscribed etc.

This helps keeping actual test function names short while still being readable in the test output:

Contracts

These contracts are intended for people who write observables or operators to ensure correct operation.

This isn’t a guide on how to create your own observables and operators, but additional rules to check, after you had made one.

To learn the basics of how to write your own observables and operators, see Writing Observables and Writing Operators.

To learn how to write them in the same fashion as rx_core and rx_bevy does, with separate crates for every observable/operator and an aggregator crate, see contributing.md.

Every observable and operator must uphold these invariants to ensure the expected runtime behavior. If they aren’t met, it should be treated as a bug!

Contracts are identified by their “contract code”, always starting with rx_contract_. Each contract features one or more verifications identified by “verification codes”, always starting with rx_verify_.

Tests

It is highly advised to have at least one test for each contract defined here wherever applicable!

Each contract should have its own test with the same name as the contract. The test should feature individual assertions with the verification code as part of the failure message.

rx_core_testing contains test harnesses that can test for some of these contracts, saving time implementing the tests, ensuring every verification and extra assertion is made.

rx_contract_closed_after_error

Applies to:

  • Observables
  • Operators
  • Subscribers

Once a subscriber emits an Error notification, it is considered “errored”, and it should be closed, teardowns executed.

A subscriber is considered errored when it emits an error signal, not when it receives once! Some operators are designed to handle errors.

Test must verify:

  • rx_verify_errored: An Error notification was observed.
  • rx_verify_closed: is_closed returns true after an Error notification was observed.
  • rx_verify_no_new_notification_after_closed: After closing, a new next, error, complete or unsubscribe event must not result in a new emission.
  • If Observable or Operator:
    • rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.
    • rx_verify_downstream_teardowns_executed: Teardowns added by a finalize downstream of the operator should also be executed.
  • If Operator:
    • rx_verify_upstream_teardowns_executed: Teardowns added by a finalize upstream of the operator should also be executed.
  • If there are input observables:
    • rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (using finalize) must also be executed.
  • If Scheduled:
    • rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.

rx_contract_closed_after_complete

Applies to:

  • Observables
  • Operators
  • Subscribers

Once an observable or a subscriber emits an Complete notification, it is considered “completed”, and it should be closed, teardowns executed.

A subscriber is considered completed when it emits an complete signal, not when it receives once! Some operators complete later, for example: delay.

Test must verify:

  • rx_verify_completed: A Complete notification was observed.
  • rx_verify_closed: is_closed returns true after a Complete notification was observed.
  • rx_verify_no_new_notification_after_closed: After closing, a new next, error, complete or unsubscribe event must not result in a new emission.
  • If Observable or Operator:
    • rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.
    • rx_verify_downstream_teardowns_executed: Teardowns added by a finalize downstream of the operator should also be executed.
  • If Operator:
    • rx_verify_upstream_teardowns_executed: Teardowns added by a finalize upstream of the operator should also be executed.
  • If there are input observables:
    • rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (using finalize) must also be executed.
  • If Scheduled:
    • rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.

rx_contract_closed_after_unsubscribe

Applies to:

  • Observables
  • Operators
  • Subscribers

An observable is not considered completed or errored when its subscription is unsubscribed. It’s a cancellation.

Test must verify:

  • rx_verify_unsubscribed: An Unsubscribe notification was observed.
  • rx_verify_closed: is_closed returns true after an Unsubscribe notification was observed.
  • rx_verify_no_new_notification_after_closed: After closing, a new next, error, complete or unsubscribe event must not result in a new emission.
  • If Observable or Operator:
    • rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.
    • rx_verify_downstream_teardowns_executed: Teardowns added by a finalize downstream of the operator should also be executed.
  • If Operator:
    • rx_verify_upstream_teardowns_executed: Teardowns added by a finalize upstream of the operator should also be executed.
  • If there are input observables:
    • rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (using finalize) must also be executed.
  • If Scheduled:
    • rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.

rx_contract_closed_if_downstream_closes_early

Applies to:

  • Observables
  • Operators
  • Subscribers

A subscription must be closed if a downstream operator like take(1+) closes it early.

Test must verify:

  • rx_verify_closed: is_closed returns true after a Unsubscribe notification was observed.
  • rx_verify_no_new_notification_after_closed: After closing, a new next, error, complete or unsubscribe event must not result in a new emission.
  • If Observable or Operator:
    • rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.
    • rx_verify_downstream_teardowns_executed: Teardowns added by a finalize downstream of the operator should also be executed.
  • If Operator:
    • rx_verify_upstream_teardowns_executed: Teardowns added by a finalize upstream of the operator should also be executed.
  • If there are input observables:
    • rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (using finalize) must also be executed.
  • If Scheduled:
    • rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.

rx_contract_closed_if_downstream_closes_immediately

Applies to:

  • Observables
  • Operators
  • Subscribers

A subscription must be closed if a downstream operator like take(0) closes it immediately.

Test must verify:

  • rx_verify_closed: is_closed returns true after a Unsubscribe notification was observed.
  • rx_verify_no_new_notification_after_closed: After closing, a new next, error, complete or unsubscribe event must not result in a new emission.
  • If Observable or Operator:
    • rx_verify_subscription_teardowns_executed: Teardowns added to the subscription are executed.
    • rx_verify_downstream_teardowns_executed: Teardowns added by a finalize downstream of the operator should also be executed.
  • If Operator:
    • rx_verify_upstream_teardowns_executed: Teardowns added by a finalize upstream of the operator should also be executed.
  • If there are input observables:
    • rx_verify_input_observable_teardowns_executed: Teardowns added by an input observable (using finalize) must also be executed.
  • If Scheduled:
    • rx_verify_scheduler_is_empty: No work should remain in the schedulers executor once the subscription is unsubscribed. Both normal and invoked work should be cancelled.

rx_contract_immediate_completion

Applies to:

  • Observables
  • Operators (If can complete on its own)
  • Subscribers (If can complete on its own)

Once known that further emissions are impossible, completion should be immediate.

For example, knowing when an iterator is finished is trivial, after the last next, a complete must immediately follow.

But combination observables have to deduce their own completion based on the Observables they combine:

  • CombineLatestObservable when already primed, completes only when all of its inner observables have finished emitting values! Before it’s primed, it completes when any of its inner observables complete or unsubscribe, as priming then becomes impossible.
  • ZipObservable completes when any of its inner observables have finished emitting values!

Test must verify:

  • rx_verify_immediately_completed: After the last next signal, a complete signal should be observed immediately.

rx_contract_do_not_complete_until_necessary

Applies to:

  • Combination Observables

Combination observables should not complete until it becomes impossible to emit further values.

Test must verify:

  • rx_verify_not_closed: If a single input observable unsubscribed, but another one can still trigger emissions, the observable itself should not complete yet.

Additional Guidelines

These are additional guidelines to better adhere to the contracts. Some of them are indirectly verified by contracts, some of them are not testable. Either way these do not need their own tests, and as such, can’t be considered contracts by themselves. More like reminders, or suggestions.

Operator Subscribers must always forward all upstream signal downstream unless altering it is their expected behavior

Downstream operators depend on signals too, don’t forget to forward them!

The map operator’s only job is to turn the upstream next signal into their mapped value and forward it downstream. It does not alter the behavior of error, complete and unsubscribe, so it must call the same function on its destination.

For example:

fn complete(&mut self) {
    self.destination.complete(context);
}

No unnecessary .is_closed() checks

Only newly produced signals should check if the the destination is still open!

For an observable, this does mean every individual signal, as they originate from there.

For example map only transforms values. Upstream won’t ever send anything after it’s closed. map only interacts with downstream once each time upstream interacts with it, and returns downstream’s is_closed state, therefore in case downstream closes early, an upstream producer shouldn’t even try interacting with it anyway.

The only exeptions are Subjects, where the Observer functions are exposed to the user.

fn next(&mut self, next: Self::In) {
    if !self.destination.is_closed() { // Unnecessary
        self.destination.next((self.mapper)(next));
    }
}

This only applies to the first synchronous interaction with downstream as any interaction with downstream can potentially cause it to be closed:

Incorrect:

fn next(&mut self, next: Self::In) {
    self.destination.next(next.clone()); // Still not necessary to check!
    self.destination.next(next); // Should be checked if not closed!
}

Correct:

fn next(&mut self, next: Self::In) { // Wouldn't even be called if it's closed!
    self.destination.next(next.clone());
    if self.is_closed() { // The first next could cause downstream to close!
        self.destination.next(next);
    }
}

Then it would be considered a producer, and the second downstream next call should be checked!

An loop for example should break if further iterations are unnecessary.

for item in self.iterator.clone().into_iter() {
    if destination.is_closed() {
        break;
    }
    destination.next(item);
}

As a rule of thumb, if a subscribers is_closed implementation already respects the “closedness” of downstream, for the very first interaction with it, it does not need to check if downstream is closed, as upstream already did.

What if I don’t?

If you do make extra checks, the penalty is just an extra if.

If you do not check if the destination is closed before sending a new signal, then any work done by downstream operators is also unnecessary.

Neither of these problems are “lethal”, this is about optimization.

Use Never as your signal type if that signal is never sent

The rx_core_common crate exposes the Never type which can’t be constructed since it’s an enum with no variants.

Never is actually just a type alias for core::convert::Infallible. The reason Infallible isn’t used directly, because that name conveys that it’s an error, while here it could mean any event/signal that can never happen. And that event can be a valid output too, not just an error.

This type MUST be used to denote signals that are never produced instead of using the unit type () which could be produced, and as such is inadequate to denote that something won’t ever produce said signal.

  • If an Observable never produces an error, it must set its OutError type to Never.
  • If an Observable never produces a value, its Out type must be set to Never.
    • For example the ThrowObservable only produces a single error, therefore its Out type is Never
    • And the NeverObservable never produces anything so both Out and OutError is Never.
  • If a Subscriber never sends errors downstream (for example it catches errors), it also must set its OutError type to Never.
  • If a Subscriber never sends values downstream (for example it re-throws them as errors), it also must set its Out type to Never.

Note that in the future once Rust stabilizes the actual never type (!), the Never type in rx_core_common will be deprecated in favor of it.

Tracking issue: https://github.com/AlexAegis/rx_bevy/issues/27