Unit Testing RxJS Observables – A Practical Guide

Unit testing RxJS observation chains can be challenging due to their asynchronous nature. Our latest technical blog looks at simplifying the process.

Table of Contents

  1. Challenges in Unit Testing RxJS Observables
  2. Technical Prerequisites
  3. Unit Testing Objective
  4. Unit Testing RxJS Observation Chains: Solution Overview
  5. Unit Testing RxJS Observation Chains: Solution Example
  6. Unit Testing RxJS Observation Chains: Solution Recap
  7. Further Resources
  8. Appendix A: Alternative Solutions Considered for Unit Testing RxJS Observation Chains

Challenges in Unit Testing RxJS Observables

The User Interfaces (UIs) that Adaptive builds for its capital markets clients are typically “reactive”, meaning they have views that update in real time as financial markets change. At Adaptive, we favor using RxJS observables to manage application state, binding them to UI components so that views update when observables emit. Within the state layer of the application, we often have non-trivial observation chains defined by passing operator functions to the pipe() method. These observation chains define how data from several sources are combined and transformed into the view model objects to be consumed by UI components. 

The correctness of the logic in these observation chains is often essential to the behavior of the app, yet it can be challenging to unit test because observables asynchronously emit multiple values over time.

So how can we effectively unit test RxJS observation chains to verify that they behave as we expect them to?

Technical Prerequisites

The reader is assumed to have familiarity with TypeScript, RxJS and a unit testing framework such as Vitest or Jest. This article will not focus on any specific UI frameworks or view/component libraries. If you’re using RxJS to manage application state in a reactive manner, this content should be relevant regardless of whether you’re using React, Vue.js, Angular, or any other framework of the week.

The example code in this article can be found in the accompanying GitHub repo. You can trace the progression of the example’s development through this article using the tags in the repo.

Unit Testing Objective

The Adaptive team wanted to be able to test the various permutations of state emitted by an observable created by an observation chain, given the various inputs into said chain via the upstream source observables that it depends on. 

The units we want to test are stateful observables that are exported as is, or functions that accept a key (such as an entity ID) and return an observable. Like the observation chain below, for example, which this article will demonstrate how to unit test.

Stage 1: state.ts

import { Observable, map, mergeWith, scan, startWith } from 'rxjs'
import { pricesDto$, resetPrices$ } from './service'

export const prices$: Observable<Record<string, number>> = pricesDto$.pipe(
  scan(
    (accum, current) => ({
      ...accum,
      [current.symbol]: current.price,
    }),
    {}
  ),
  mergeWith(resetPrices$.pipe(map(() => ({})))),
  startWith({})
)

We explored and rejected some approaches which are described in Appendix A: Alternative Solutions Considered for Unit Testing RxJS Observation Chains, before developing the pattern that we present here.

Unit Testing RxJS Observation Chains: Solution Overview

As we will demonstrate below, we achieve our testing goals by subscribing to the observable under test with spy functions so that we can assert on what was emitted, and mocking out the source observables with RxJS subjects. We do this with a very small utility function called spyOnObservable() that abstracts away the boilerplate associated with spying on observers. We are using Vitest here, but the semantics are very similar if you are using Jest. If you’re using a different testing library (such as Mocha, Jasmine, or Qunit), the semantics might be a bit different, but the concepts and functionality should be easily transferable.

spyOnObservable.ts

import { Observable, Subscription } from 'rxjs'
import { Mock } from 'vitest'

/**
 * Utility function for testing observables.
 * Returns an object containing mock observer functions.
 *
 * To ensure your test does not cause a memory leak, assert that `complete`
 * has been called; this will verify that this utility has unsubscribed
 * from the observable under test. Alternatively, explicitly unsubscribe
 * the subscription that is returned.
 *
 * Example usage:
 *
 *    const { next, error, complete, subscription, latestEmission, emissionCount } =
 *      spyOnObservable(observableToTest$.pipe(take(1)))
 *
 *    expect(next).toHaveBeenCalledTimes(1)
 *    expect(next).toHaveBeenCalledWith(someValue)
 *    expect(latestEmission()).toBe(someValue)
 *    expect(error).not.toHaveBeenCalled()
 *    subscription.unsubscribe()
 *    expect(complete).toHaveBeenCalled()
 */
export function spyOnObservable(observable$: Observable<unknown>) {
  const next: Mock<any, any> = vi.fn()
  const error: Mock<any, any> = vi.fn()
  const complete: Mock<any, any> = vi.fn()
  const emissionCount = () => next.mock.calls.length
  const latestEmission = () => {
    try {
      return next.mock.calls.at(-1)![0]
    } catch (e) {
      throw new Error('expected next to have been called')
    }
  }

  let subscription: Subscription

  subscription = observable$.subscribe({
    next,
    error,
    complete: () => {
      subscription?.unsubscribe()
      complete()
    },
  })

  return { next, error, complete, subscription, latestEmission, emissionCount }
}

Unit Testing RxJS Observation Chains: Solution Example

We will use the following contrived use case to demonstrate how we would apply this in real life. We chose a use case that is materially simpler than what we’d encounter in real life, but with sufficient complexity to illustrate the problem and how we’d solve it.

Say we have an application that needs to display prices of financial instruments updating in real time. In order to do this, we want a lookup table of symbols to prices, with an entry for every instrument that we have a price for. We have an observable that will emit an object containing the latest value of this table every time there is a price update. This is the observable that we want to test.

The observable under test depends on two source observables:

  • The first will emit an object containing a symbol and a price for an individual instrument every time there is a price update for that instrument. (In a real world application this observable would be an abstraction over a WebSocket stream.) 
  • The second represents an event, that when triggered, should reset the state of the price lookup table to its initial empty state.

Our initial implementation of this observable is below. Without tests, however, we don’t know for certain that it behaves as intended.

Stage 1: state.ts

import { Observable, map, mergeWith, scan, startWith } from 'rxjs'
import { pricesDto$, resetPrices$ } from './service'

export const prices$: Observable<Record<string, number>> = pricesDto$.pipe(
  scan(
    (accum, current) => ({
      ...accum,
      [current.symbol]: current.price,
    }),
    {}
  ),
  mergeWith(resetPrices$.pipe(map(() => ({})))),
  startWith({})
)

For additional context, see service.ts and model.ts

Let’s set up our test file.

Stage 1: state.test.ts

import { Subject } from 'rxjs'
import { Price } from './model'

// create subjects to mock out the source observables that our
// observable under test depends on
const mockPricesDto$ = new Subject<Price>()
const mockResetPrices$ = new Subject<void>()

// use doMock() rather than mock() so we can reference the
// variables containing the mock observables. mock() is hoisted
// so it does not allow referencing variables in the file scope.
// see https://vitest.dev/api/vi#vi-mock and
// https://vitest.dev/api/vi#vi-domock
vi.doMock('./service', () => ({
  pricesDto$: mockPricesDto$,
  resetPrices$: mockResetPrices$,
}))

// we need to dynamically import the observable under test
// after we call vi.doMock. see https://vitest.dev/api/vi#vi-domock
const { prices$ } = await import('./state')

describe('prices$', () => {
  // we are now ready to add our tests here
})

Now that we have the shell of the test file set up, let’s add some basic assertions. Note the usage of our spyOnObservable() utility function.

Stage 2: state.test.ts

import { Subject } from 'rxjs'
import { Price } from './model'
import { spyOnObservable } from './spyOnObservable'

// create subjects to mock out the source observables that our
// observable under test depends on
const mockPricesDto$ = new Subject<Price>()
const mockResetPrices$ = new Subject<void>()

// use doMock() rather than mock() so we can reference the
// variables containing the mock observables. mock() is hoisted
// so it does not allow referencing variables in the file scope.
// see https://vitest.dev/api/vi#vi-mock and
// https://vitest.dev/api/vi#vi-domock
vi.doMock('./service', () => ({
  pricesDto$: mockPricesDto$,
  resetPrices$: mockResetPrices$,
}))

// we need to dynamically import the observable under test
// after we call vi.doMock. see https://vitest.dev/api/vi#vi-domock
const { prices$ } = await import('./state')

describe('prices$', () => {
  // spy on the observable under test, using the spyOnObservable utility
  const { latestEmission, error, subscription } = spyOnObservable(prices$)

  // ensure we unsubscribe when we are done to avoid memory leaks
  afterAll(() => {
    subscription.unsubscribe()
  })

  it('should initially emit empty object', () => {
    expect(latestEmission()).toEqual({})
  })

  it('should not error', () => {
    expect(error).not.toBeCalled()
  })
})

And our tests are passing:

RxJS Observables: Stage 2 - Tests Passing

So far so good. Now let’s add some assertions about what the behavior will be when the source observable pricesDto$ emits.

Stage 3: state.test.ts

import { Subject } from 'rxjs'
import { Price } from './model'
import { spyOnObservable } from './spyOnObservable'

// create subjects to mock out the source observables that our
// observable under test depends on
const mockPricesDto$ = new Subject<Price>()
const mockResetPrices$ = new Subject<void>()

// use doMock() rather than mock() so we can reference the
// variables containing the mock observables. mock() is hoisted
// so it does not allow referencing variables in the file scope.
// see https://vitest.dev/api/vi#vi-mock and
// https://vitest.dev/api/vi#vi-domock
vi.doMock('./service', () => ({
  pricesDto$: mockPricesDto$,
  resetPrices$: mockResetPrices$,
}))

// we need to dynamically import the observable under test
// after we call vi.doMock. see https://vitest.dev/api/vi#vi-domock
const { prices$ } = await import('./state')

describe('prices$', () => {
  // spy on the observable under test, using the spyOnObservable utility
  const { latestEmission, error, subscription } = spyOnObservable(prices$)

  // ensure we unsubscribe when we are done to avoid memory leaks
  afterAll(() => {
    subscription.unsubscribe()
  })

  it('should initially emit empty object', () => {
    expect(latestEmission()).toEqual({})
  })

  it('should emit object containing latest prices after pricesDto$ emits', () => {
    // call next() on the subject that mocks out the source observable
    // priceDto$ that the observable under test depends on to simulate
    // that observable emitting prices, and ensure the new price is
    // emitted as expected in the observable under test
    mockPricesDto$.next({ symbol: 'XOM', price: 48.17 })
    expect(latestEmission()).toEqual({ XOM: 48.17 })

    // add another instrument/price, ensure both instruments
    // appear in the resulting emission
    mockPricesDto$.next({ symbol: 'BA', price: 218.93 })
    expect(latestEmission()).toEqual({ XOM: 48.17, BA: 218.93 })

    // update the price of the first instrument, ensure the price is
    // updated in the resulting emission
    mockPricesDto$.next({ symbol: 'XOM', price: 48.21 })
    expect(latestEmission()).toEqual({ XOM: 48.21, BA: 218.93 })
  })

  it('should not error', () => {
    expect(error).not.toBeCalled()
  })
})

Great, our tests are still passing:

RxJS Observables: Stage 3 - Tests Passing

Now let’s add an assertion for the reset event.

Stage 4: state.test.ts

import { Subject } from 'rxjs'
import { Price } from './model'
import { spyOnObservable } from './spyOnObservable'

// create subjects to mock out the source observables that our
// observable under test depends on
const mockPricesDto$ = new Subject<Price>()
const mockResetPrices$ = new Subject<void>()

// use doMock() rather than mock() so we can reference the
// variables containing the mock observables. mock() is hoisted
// so it does not allow referencing variables in the file scope.
// see https://vitest.dev/api/vi#vi-mock and
// https://vitest.dev/api/vi#vi-domock
vi.doMock('./service', () => ({
  pricesDto$: mockPricesDto$,
  resetPrices$: mockResetPrices$,
}))

// we need to dynamically import the observable under test
// after we call vi.doMock. see https://vitest.dev/api/vi#vi-domock
const { prices$ } = await import('./state')

describe('prices$', () => {
  // spy on the observable under test, using the spyOnObservable utility
  const { latestEmission, error, subscription } = spyOnObservable(prices$)

  // ensure we unsubscribe when we are done to avoid memory leaks
  afterAll(() => {
    subscription.unsubscribe()
  })

  it('should initially emit empty object', () => {
    expect(latestEmission()).toEqual({})
  })

  it('should emit object containing latest prices after pricesDto$ emits', () => {
    // call next() on the subject that mocks out the source observable
    // priceDto$ that the observable under test depends on, to simulate
    // that observable emitting prices, and ensure the new price is
    // emitted as expected in the observable under test
    mockPricesDto$.next({ symbol: 'XOM', price: 48.17 })
    expect(latestEmission()).toEqual({ XOM: 48.17 })

    // add another instrument/price, ensure both instruments
    // appear in the resulting emission
    mockPricesDto$.next({ symbol: 'BA', price: 218.93 })
    expect(latestEmission()).toEqual({ XOM: 48.17, BA: 218.93 })

    // update the price of the first instrument, ensure the price is
    // updated in the resulting emission
    mockPricesDto$.next({ symbol: 'XOM', price: 48.21 })
    expect(latestEmission()).toEqual({ XOM: 48.21, BA: 218.93 })
  })

  it('should emit empty object after resetPrices$ emits', () => {
    // call next() on the subject that mocks out the source observable
    // resetPrices$ that the observable under test depends on, to simulate
    // that observable emitting, and ensure that the prices lookup table
    // is reset to an empty object
    mockResetPrices$.next()
    expect(latestEmission()).toEqual({})
  })

  it('should not error', () => {
    expect(error).not.toBeCalled()
  })
})

Great, the tests pass yet again.

RxJS Observables: Stage 4 - Tests Passing

Now let’s add an assertion that on the next prices that flow in after a reset, we only have the new prices in the lookup table.

Stage 5: state.test.ts

import { Subject } from 'rxjs'
import { Price } from './model'
import { spyOnObservable } from './spyOnObservable'

// create subjects to mock out the source observables that our
// observable under test depends on
const mockPricesDto$ = new Subject<Price>()
const mockResetPrices$ = new Subject<void>()

// use doMock() rather than mock() so we can reference the
// variables containing the mock observables. mock() is hoisted
// so it does not allow referencing variables in the file scope.
// see https://vitest.dev/api/vi#vi-mock and
// https://vitest.dev/api/vi#vi-domock
vi.doMock('./service', () => ({
  pricesDto$: mockPricesDto$,
  resetPrices$: mockResetPrices$,
}))

// we need to dynamically import the observable under test
// after we call vi.doMock. see https://vitest.dev/api/vi#vi-domock
const { prices$ } = await import('./state')

describe('prices$', () => {
  // spy on the observable under test, using the spyOnObservable utility
  const { latestEmission, error, subscription } = spyOnObservable(prices$)

  // ensure we unsubscribe when we are done to avoid memory leaks
  afterAll(() => {
    subscription.unsubscribe()
  })

  it('should initially emit empty object', () => {
    expect(latestEmission()).toEqual({})
  })

  it('should emit object containing latest prices after pricesDto$ emits', () => {
    // call next() on the subject that mocks out the source observable
    // priceDto$ that the observable under test depends on, to simulate
    // that observable emitting prices, and ensure the new price is
    // emitted as expected in the observable under test
    mockPricesDto$.next({ symbol: 'XOM', price: 48.17 })
    expect(latestEmission()).toEqual({ XOM: 48.17 })

    // add another instrument/price, ensure both instruments
    // appear in the resulting emission
    mockPricesDto$.next({ symbol: 'BA', price: 218.93 })
    expect(latestEmission()).toEqual({ XOM: 48.17, BA: 218.93 })

    // update the price of the first instrument, ensure the price is
    // updated in the resulting emission
    mockPricesDto$.next({ symbol: 'XOM', price: 48.21 })
    expect(latestEmission()).toEqual({ XOM: 48.21, BA: 218.93 })
  })

  it('should emit empty object after resetPrices$ emits', () => {
    // call next() on the subject that mocks out the source observable
    // resetPrices$ that the observable under test depends on, to simulate
    // that observable emitting, and ensure that the prices lookup table
    // is reset to an empty object
    mockResetPrices$.next()
    expect(latestEmission()).toEqual({})
  })

  it('should emit object containing only the latest prices after pricesDto$ emits', () => {
    mockPricesDto$.next({ symbol: 'HD', price: 332.12 })
    mockPricesDto$.next({ symbol: 'AA', price: 24.49 })
    expect(latestEmission()).toEqual({ HD: 332.12, AA: 24.49 })
  })

  it('should not error', () => {
    expect(error).not.toBeCalled()
  })
})

…and we have a failing test:

RxJS Observables: Stage 5 - Tests Failing

It seems our test has uncovered a bug in our implementation! We want the state to be reset to an empty object when the reset observable emits. But the accumulator object in the reducer function that we pass to scan() retains the state even after the reset. Let’s fix that by moving the mergeWith() up before the scan(), and adding a conditional within the reducer function to replace the state with an empty object in the event of a reset. (In this case, the value flowing into scan() will be falsy rather than a Price object.)

Stage 6: state.ts

import { Observable, mergeWith, scan, startWith } from 'rxjs'
import { pricesDto$, resetPrices$ } from './service'
import { Price } from './model'

export const prices$: Observable<Record<string, number>> = pricesDto$.pipe(
  mergeWith(resetPrices$),
  scan(
    (accum, current: Price | void) =>
      !current
        ? {}
        : {
            ...accum,
            [current.symbol]: current.price,
          },
    {}
  ),
  startWith({})
)

Our tests are now passing again, giving us confidence that our observation chain does what we want it to.

RxJS Observables: Stage 6 - Tests Passing

Unit Testing RxJS Observation Chains: Solution Recap

The spyOnObservable utility provides an easy to use API for spying on observables to facilitate asserting on their behavior. This utility paired with mocking the upstream observables with RxJS Subjects defines a scalable pattern for writing syntactically pleasing unit tests that simulate how observation chains behave in the real world given various inputs via the upstream source observables.

Feel free to copy spyOnObservable, adapt it to your needs as you see fit, and use it without attribution.

Further Resources

If you’re interested in learning more about how to implement reactive programming principles, please see Reactive Trader® and its open source codebase. Reactive Trader® is Adaptive’s real-time foreign exchange (FX) and credit trading platform designed to showcase reactive programming principles across the full application stack.

If you'd like to learn about React-RxJS, a library that provides bindings to integrate RxJS observables with React, please see our 2020 post Why React RxJS.

Appendix A: Alternative Solutions Considered for Unit Testing RxJS Observation Chains

Marble testing is well suited for testing RxJS operators (such as map(), first(), etc). e.g. factory functions that create an operator function. In fact, the unit tests for the RxJS operators within the RxJS code base use marble tests. Marble testing can also be useful when testing a function that accepts an observable and returns another observable.

For our use case of testing stateful observation chains, however, marble testing does not seem to be a good fit. Furthermore, marble tests assert on the entire resulting stream of emissions. We wanted to be able to test what gets emitted play by play, interactively, so we can verify the expected result after each emission by the source observables.

Another approach we briefly explored is subscribing to the observable and making test assertions in the observer (e.g. the next handler passed to subscribe()). This works poorly for observables that emit more than one value, it only works when you know the observable will complete, and requires boilerplate to handle fulfilling the promise of the async test when the observable completes. You can work around this to some extent by forcing completion by using an operator such as take() (but you need to know exactly how many emissions to be concerned with), along with toArray() to examine multiple values emitted. We did not find this approach satisfactory.

Author: Bruce Harris

×

Contact us

By pressing "Send" I agree that I am happy to be contacted by Adaptive Financial Consulting. I can unsubscribe at any time. You can read more about our privacy policy here.

×

I would like to read the full story

By completing the below form I confirm I am happy to be contacted by Adaptive Financial Consulting Ltd. I can unsubscribe at anytime.