9th January 2024
Unit Testing RxJS Observables – A Practical Guide
Unit testing RxJS observation chains can be challenging due to their asynchronous nature. Our latest technical blog looks at simplifying the process.
Table of Contents
- Challenges in Unit Testing RxJS Observables
- Technical Prerequisites
- Unit Testing Objective
- Unit Testing RxJS Observation Chains: Solution Overview
- Unit Testing RxJS Observation Chains: Solution Example
- Unit Testing RxJS Observation Chains: Solution Recap
- Further Resources
- Appendix A: Alternative Solutions Considered for Unit Testing RxJS Observation Chains
Challenges in Unit Testing RxJS Observables
The User Interfaces (UIs) that Adaptive builds for its capital markets clients are typically “reactive”, meaning they have views that update in real time as financial markets change. At Adaptive, we favor using RxJS observables to manage application state, binding them to UI components so that views update when observables emit. Within the state layer of the application, we often have non-trivial observation chains defined by passing operator functions to the pipe()
method. These observation chains define how data from several sources are combined and transformed into the view model objects to be consumed by UI components.
The correctness of the logic in these observation chains is often essential to the behavior of the app, yet it can be challenging to unit test because observables asynchronously emit multiple values over time.
So how can we effectively unit test RxJS observation chains to verify that they behave as we expect them to?
Technical Prerequisites
The reader is assumed to have familiarity with TypeScript, RxJS and a unit testing framework such as Vitest or Jest. This article will not focus on any specific UI frameworks or view/component libraries. If you’re using RxJS to manage application state in a reactive manner, this content should be relevant regardless of whether you’re using React, Vue.js, Angular, or any other framework of the week.
The example code in this article can be found in the accompanying GitHub repo. You can trace the progression of the example’s development through this article using the tags in the repo.
Unit Testing Objective
The Adaptive team wanted to be able to test the various permutations of state emitted by an observable created by an observation chain, given the various inputs into said chain via the upstream source observables that it depends on.
The units we want to test are stateful observables that are exported as is, or functions that accept a key (such as an entity ID) and return an observable. Like the observation chain below, for example, which this article will demonstrate how to unit test.
import { Observable, map, mergeWith, scan, startWith } from 'rxjs' import { pricesDto$, resetPrices$ } from './service' export const prices$: Observable<Record<string, number>> = pricesDto$.pipe( scan( (accum, current) => ({ ...accum, [current.symbol]: current.price, }), {} ), mergeWith(resetPrices$.pipe(map(() => ({})))), startWith({}) )
We explored and rejected some approaches which are described in Appendix A: Alternative Solutions Considered for Unit Testing RxJS Observation Chains, before developing the pattern that we present here.
Unit Testing RxJS Observation Chains: Solution Overview
As we will demonstrate below, we achieve our testing goals by subscribing to the observable under test with spy functions so that we can assert on what was emitted, and mocking out the source observables with RxJS subjects. We do this with a very small utility function called spyOnObservable()
that abstracts away the boilerplate associated with spying on observers. We are using Vitest here, but the semantics are very similar if you are using Jest. If you’re using a different testing library (such as Mocha, Jasmine, or Qunit), the semantics might be a bit different, but the concepts and functionality should be easily transferable.
import { Observable, Subscription } from 'rxjs' import { Mock } from 'vitest' /** * Utility function for testing observables. * Returns an object containing mock observer functions. * * To ensure your test does not cause a memory leak, assert that `complete` * has been called; this will verify that this utility has unsubscribed * from the observable under test. Alternatively, explicitly unsubscribe * the subscription that is returned. * * Example usage: * * const { next, error, complete, subscription, latestEmission, emissionCount } = * spyOnObservable(observableToTest$.pipe(take(1))) * * expect(next).toHaveBeenCalledTimes(1) * expect(next).toHaveBeenCalledWith(someValue) * expect(latestEmission()).toBe(someValue) * expect(error).not.toHaveBeenCalled() * subscription.unsubscribe() * expect(complete).toHaveBeenCalled() */ export function spyOnObservable(observable$: Observable<unknown>) { const next: Mock<any, any> = vi.fn() const error: Mock<any, any> = vi.fn() const complete: Mock<any, any> = vi.fn() const emissionCount = () => next.mock.calls.length const latestEmission = () => { try { return next.mock.calls.at(-1)![0] } catch (e) { throw new Error('expected next to have been called') } } let subscription: Subscription subscription = observable$.subscribe({ next, error, complete: () => { subscription?.unsubscribe() complete() }, }) return { next, error, complete, subscription, latestEmission, emissionCount } }
Unit Testing RxJS Observation Chains: Solution Example
We will use the following contrived use case to demonstrate how we would apply this in real life. We chose a use case that is materially simpler than what we’d encounter in real life, but with sufficient complexity to illustrate the problem and how we’d solve it.
Say we have an application that needs to display prices of financial instruments updating in real time. In order to do this, we want a lookup table of symbols to prices, with an entry for every instrument that we have a price for. We have an observable that will emit an object containing the latest value of this table every time there is a price update. This is the observable that we want to test.
The observable under test depends on two source observables:
- The first will emit an object containing a symbol and a price for an individual instrument every time there is a price update for that instrument. (In a real world application this observable would be an abstraction over a WebSocket stream.)
- The second represents an event, that when triggered, should reset the state of the price lookup table to its initial empty state.
Our initial implementation of this observable is below. Without tests, however, we don’t know for certain that it behaves as intended.
import { Observable, map, mergeWith, scan, startWith } from 'rxjs' import { pricesDto$, resetPrices$ } from './service' export const prices$: Observable<Record<string, number>> = pricesDto$.pipe( scan( (accum, current) => ({ ...accum, [current.symbol]: current.price, }), {} ), mergeWith(resetPrices$.pipe(map(() => ({})))), startWith({}) )
For additional context, see service.ts and model.ts
Let’s set up our test file.
import { Subject } from 'rxjs' import { Price } from './model' // create subjects to mock out the source observables that our // observable under test depends on const mockPricesDto$ = new Subject<Price>() const mockResetPrices$ = new Subject<void>() // use doMock() rather than mock() so we can reference the // variables containing the mock observables. mock() is hoisted // so it does not allow referencing variables in the file scope. // see https://vitest.dev/api/vi#vi-mock and // https://vitest.dev/api/vi#vi-domock vi.doMock('./service', () => ({ pricesDto$: mockPricesDto$, resetPrices$: mockResetPrices$, })) // we need to dynamically import the observable under test // after we call vi.doMock. see https://vitest.dev/api/vi#vi-domock const { prices$ } = await import('./state') describe('prices$', () => { // we are now ready to add our tests here })
Now that we have the shell of the test file set up, let’s add some basic assertions. Note the usage of our spyOnObservable()
utility function.
import { Subject } from 'rxjs' import { Price } from './model' import { spyOnObservable } from './spyOnObservable' // create subjects to mock out the source observables that our // observable under test depends on const mockPricesDto$ = new Subject<Price>() const mockResetPrices$ = new Subject<void>() // use doMock() rather than mock() so we can reference the // variables containing the mock observables. mock() is hoisted // so it does not allow referencing variables in the file scope. // see https://vitest.dev/api/vi#vi-mock and // https://vitest.dev/api/vi#vi-domock vi.doMock('./service', () => ({ pricesDto$: mockPricesDto$, resetPrices$: mockResetPrices$, })) // we need to dynamically import the observable under test // after we call vi.doMock. see https://vitest.dev/api/vi#vi-domock const { prices$ } = await import('./state') describe('prices$', () => { // spy on the observable under test, using the spyOnObservable utility const { latestEmission, error, subscription } = spyOnObservable(prices$) // ensure we unsubscribe when we are done to avoid memory leaks afterAll(() => { subscription.unsubscribe() }) it('should initially emit empty object', () => { expect(latestEmission()).toEqual({}) }) it('should not error', () => { expect(error).not.toBeCalled() }) })
And our tests are passing:

So far so good. Now let’s add some assertions about what the behavior will be when the source observable pricesDto$
emits.
import { Subject } from 'rxjs' import { Price } from './model' import { spyOnObservable } from './spyOnObservable' // create subjects to mock out the source observables that our // observable under test depends on const mockPricesDto$ = new Subject<Price>() const mockResetPrices$ = new Subject<void>() // use doMock() rather than mock() so we can reference the // variables containing the mock observables. mock() is hoisted // so it does not allow referencing variables in the file scope. // see https://vitest.dev/api/vi#vi-mock and // https://vitest.dev/api/vi#vi-domock vi.doMock('./service', () => ({ pricesDto$: mockPricesDto$, resetPrices$: mockResetPrices$, })) // we need to dynamically import the observable under test // after we call vi.doMock. see https://vitest.dev/api/vi#vi-domock const { prices$ } = await import('./state') describe('prices$', () => { // spy on the observable under test, using the spyOnObservable utility const { latestEmission, error, subscription } = spyOnObservable(prices$) // ensure we unsubscribe when we are done to avoid memory leaks afterAll(() => { subscription.unsubscribe() }) it('should initially emit empty object', () => { expect(latestEmission()).toEqual({}) }) it('should emit object containing latest prices after pricesDto$ emits', () => { // call next() on the subject that mocks out the source observable // priceDto$ that the observable under test depends on to simulate // that observable emitting prices, and ensure the new price is // emitted as expected in the observable under test mockPricesDto$.next({ symbol: 'XOM', price: 48.17 }) expect(latestEmission()).toEqual({ XOM: 48.17 }) // add another instrument/price, ensure both instruments // appear in the resulting emission mockPricesDto$.next({ symbol: 'BA', price: 218.93 }) expect(latestEmission()).toEqual({ XOM: 48.17, BA: 218.93 }) // update the price of the first instrument, ensure the price is // updated in the resulting emission mockPricesDto$.next({ symbol: 'XOM', price: 48.21 }) expect(latestEmission()).toEqual({ XOM: 48.21, BA: 218.93 }) }) it('should not error', () => { expect(error).not.toBeCalled() }) })
Great, our tests are still passing:

Now let’s add an assertion for the reset event.
import { Subject } from 'rxjs' import { Price } from './model' import { spyOnObservable } from './spyOnObservable' // create subjects to mock out the source observables that our // observable under test depends on const mockPricesDto$ = new Subject<Price>() const mockResetPrices$ = new Subject<void>() // use doMock() rather than mock() so we can reference the // variables containing the mock observables. mock() is hoisted // so it does not allow referencing variables in the file scope. // see https://vitest.dev/api/vi#vi-mock and // https://vitest.dev/api/vi#vi-domock vi.doMock('./service', () => ({ pricesDto$: mockPricesDto$, resetPrices$: mockResetPrices$, })) // we need to dynamically import the observable under test // after we call vi.doMock. see https://vitest.dev/api/vi#vi-domock const { prices$ } = await import('./state') describe('prices$', () => { // spy on the observable under test, using the spyOnObservable utility const { latestEmission, error, subscription } = spyOnObservable(prices$) // ensure we unsubscribe when we are done to avoid memory leaks afterAll(() => { subscription.unsubscribe() }) it('should initially emit empty object', () => { expect(latestEmission()).toEqual({}) }) it('should emit object containing latest prices after pricesDto$ emits', () => { // call next() on the subject that mocks out the source observable // priceDto$ that the observable under test depends on, to simulate // that observable emitting prices, and ensure the new price is // emitted as expected in the observable under test mockPricesDto$.next({ symbol: 'XOM', price: 48.17 }) expect(latestEmission()).toEqual({ XOM: 48.17 }) // add another instrument/price, ensure both instruments // appear in the resulting emission mockPricesDto$.next({ symbol: 'BA', price: 218.93 }) expect(latestEmission()).toEqual({ XOM: 48.17, BA: 218.93 }) // update the price of the first instrument, ensure the price is // updated in the resulting emission mockPricesDto$.next({ symbol: 'XOM', price: 48.21 }) expect(latestEmission()).toEqual({ XOM: 48.21, BA: 218.93 }) }) it('should emit empty object after resetPrices$ emits', () => { // call next() on the subject that mocks out the source observable // resetPrices$ that the observable under test depends on, to simulate // that observable emitting, and ensure that the prices lookup table // is reset to an empty object mockResetPrices$.next() expect(latestEmission()).toEqual({}) }) it('should not error', () => { expect(error).not.toBeCalled() }) })
Great, the tests pass yet again.

Now let’s add an assertion that on the next prices that flow in after a reset, we only have the new prices in the lookup table.
import { Subject } from 'rxjs' import { Price } from './model' import { spyOnObservable } from './spyOnObservable' // create subjects to mock out the source observables that our // observable under test depends on const mockPricesDto$ = new Subject<Price>() const mockResetPrices$ = new Subject<void>() // use doMock() rather than mock() so we can reference the // variables containing the mock observables. mock() is hoisted // so it does not allow referencing variables in the file scope. // see https://vitest.dev/api/vi#vi-mock and // https://vitest.dev/api/vi#vi-domock vi.doMock('./service', () => ({ pricesDto$: mockPricesDto$, resetPrices$: mockResetPrices$, })) // we need to dynamically import the observable under test // after we call vi.doMock. see https://vitest.dev/api/vi#vi-domock const { prices$ } = await import('./state') describe('prices$', () => { // spy on the observable under test, using the spyOnObservable utility const { latestEmission, error, subscription } = spyOnObservable(prices$) // ensure we unsubscribe when we are done to avoid memory leaks afterAll(() => { subscription.unsubscribe() }) it('should initially emit empty object', () => { expect(latestEmission()).toEqual({}) }) it('should emit object containing latest prices after pricesDto$ emits', () => { // call next() on the subject that mocks out the source observable // priceDto$ that the observable under test depends on, to simulate // that observable emitting prices, and ensure the new price is // emitted as expected in the observable under test mockPricesDto$.next({ symbol: 'XOM', price: 48.17 }) expect(latestEmission()).toEqual({ XOM: 48.17 }) // add another instrument/price, ensure both instruments // appear in the resulting emission mockPricesDto$.next({ symbol: 'BA', price: 218.93 }) expect(latestEmission()).toEqual({ XOM: 48.17, BA: 218.93 }) // update the price of the first instrument, ensure the price is // updated in the resulting emission mockPricesDto$.next({ symbol: 'XOM', price: 48.21 }) expect(latestEmission()).toEqual({ XOM: 48.21, BA: 218.93 }) }) it('should emit empty object after resetPrices$ emits', () => { // call next() on the subject that mocks out the source observable // resetPrices$ that the observable under test depends on, to simulate // that observable emitting, and ensure that the prices lookup table // is reset to an empty object mockResetPrices$.next() expect(latestEmission()).toEqual({}) }) it('should emit object containing only the latest prices after pricesDto$ emits', () => { mockPricesDto$.next({ symbol: 'HD', price: 332.12 }) mockPricesDto$.next({ symbol: 'AA', price: 24.49 }) expect(latestEmission()).toEqual({ HD: 332.12, AA: 24.49 }) }) it('should not error', () => { expect(error).not.toBeCalled() }) })
…and we have a failing test:

It seems our test has uncovered a bug in our implementation! We want the state to be reset to an empty object when the reset observable emits. But the accumulator object in the reducer function that we pass to scan()
retains the state even after the reset. Let’s fix that by moving the mergeWith()
up before the scan()
, and adding a conditional within the reducer function to replace the state with an empty object in the event of a reset. (In this case, the value flowing into scan()
will be falsy rather than a Price
object.)
import { Observable, mergeWith, scan, startWith } from 'rxjs' import { pricesDto$, resetPrices$ } from './service' import { Price } from './model' export const prices$: Observable<Record<string, number>> = pricesDto$.pipe( mergeWith(resetPrices$), scan( (accum, current: Price | void) => !current ? {} : { ...accum, [current.symbol]: current.price, }, {} ), startWith({}) )
Our tests are now passing again, giving us confidence that our observation chain does what we want it to.

Unit Testing RxJS Observation Chains: Solution Recap
The spyOnObservable
utility provides an easy to use API for spying on observables to facilitate asserting on their behavior. This utility paired with mocking the upstream observables with RxJS Subjects defines a scalable pattern for writing syntactically pleasing unit tests that simulate how observation chains behave in the real world given various inputs via the upstream source observables.
Feel free to copy spyOnObservable
, adapt it to your needs as you see fit, and use it without attribution.
Further Resources
If you’re interested in learning more about how to implement reactive programming principles, please see Reactive Trader® and its open source codebase. Reactive Trader® is Adaptive’s real-time foreign exchange (FX) and credit trading platform designed to showcase reactive programming principles across the full application stack.
If you’d like to learn about React-RxJS, a library that provides bindings to integrate RxJS observables with React, please see our 2020 post Why React RxJS.
Appendix A: Alternative Solutions Considered for Unit Testing RxJS Observation Chains
Marble testing is well suited for testing RxJS operators (such as map()
, first()
, etc). e.g. factory functions that create an operator function. In fact, the unit tests for the RxJS operators within the RxJS code base use marble tests. Marble testing can also be useful when testing a function that accepts an observable and returns another observable.
For our use case of testing stateful observation chains, however, marble testing does not seem to be a good fit. Furthermore, marble tests assert on the entire resulting stream of emissions. We wanted to be able to test what gets emitted play by play, interactively, so we can verify the expected result after each emission by the source ob