Historical time series data with Rx

Here at Adaptive, we get asked fairly regularly how would we replay historical data with Rx. The short answer is we would use virtual time via the Rx TestScheduler.

Generally, that answer is met with a blank stare. So to give a more detailed response, I thought I would show you how you can do this at home.

This post assumes some knowledge of Rx and the IObservable<T> and IEnumerable<T> types. For readers not familiar with .NET, the IEnumerable<T> type can be considered analogous to the Iterable interface in Java.

Use cases

There are several use case for developers wanting to be able to replay time series data.
Some include:

  1. Developing with local test data (on disk) instead of integrating with another system.
  2. Debugging a failure from Prod.
  3. Performing various stress and performance tests with known data.
  4. Demonstrating a product in a lifelike manner e.g. demoing a prototype to a prospective client, presenting at a conference

Quick wins

If you want to just get the data and process it, then you can do this in a conventional way. You would read from disk the data and process one row/event at a time. However, this would just give you the final result of the processing. Often what people are looking for is to see each state change along the way, just as their application would process it if the data was real time, not just replayed.

So instead of seeing the end of day chart, they may want to see the chart unfold as it would with the day's events. Another scenario might be to see the effect of an Order Book being filled with orders and being processed by a matching engine. These are always more interesting with some visual candy to animate the experience.

The problem space

Initially, the problem seems to be getting data that is IEnumerable<T> and making it appear to be IObservable<T> data. However, this is overly simplistic. If that was the sum of the problem, then just using .ToObservable() would be a suitable solution.

The real solution would need to ensure that data is replayed at a cadence that reflects that of the original data sequence. If you were to use the .ToObservable() extension method then all the data would be pushed to the subscriber as fast as possible, ignoring the periods of silence in the original sequence. Instead, we want to be able to replicate not only the data from the sequence but the timeliness of the delivery of that data.

What is required

The data

First and foremost we will need to have some data. Not only do we need the actual event we would like to replay, but we also need to to have some time-stamp metadata. This information can either be the actual time the data was published, or it can be the period since the last notification. Either will work.

Once you have the data, you will want to parse that data into your model. If the time-stamp data is not part of your model, then you can add it as metadata with either the Reactive.Timestamped<T> or Reactive.TimeInterval<T> envelope types.

For our example we are going to source data from Yahoo. Using their chart API, we get the last day of Apple stock prices via this URI. This returns us data like the following:


    uri:/instrument/1.0/AAPL/chartdata;type=quote;range=1d/csv
    ticker:aapl
    Company-Name:Apple Inc.
    Exchange-Name:NMS
    unit:MIN
    timezone:EDT
    currency:USD
    gmtoffset:-14400
    previous_close:126.0000
    Timestamp:1436275800,1436299200
    labels:1436277600,1436281200,1436284800,1436288400,1436292000,1436295600,1436299200
    values:Timestamp,close,high,low,open,volume
    close:123.8600,126.0600
    high:123.9100,126.1500
    low:123.7700,125.8900
    open:123.8600,126.0700
    volume:0,1523700
    1436275859,125.8001,126.0000,125.8000,125.9600,566200
    1436275860,125.8700,125.9900,125.7700,125.8000,193100
    1436275979,125.9220,125.9600,125.8100,125.8800,132000
    1436276039,126.0600,126.0700,125.8900,125.9200,201000
    1436276099,125.8311,126.1500,125.8200,126.0700,249400
    1436276101,125.8500,125.9000,125.7800,125.8300,142200

The parser

We then parse the data in a rather simplistic way using Regex pattern matching.


    ^(?d{10}),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+)$

Ah regex, gotta love them. Right?

Next we can just enumerate through all of the matching lines. This also has the benefit of automatically excluding the header information rows. We can then simply map this to our model.


    public static IEnumerable FromYahooFinanceCsv(string payload)
    {
        var regex = new Regex(@"^(?d{10}),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+)$", RegexOptions.Multiline);
        var matches = regex.Matches(payload);
        foreach (Match match in matches)
        {
            var tick = new OHLC
            {
                Open=Decimal.Parse(match.Groups["open"].Value),
                High = Decimal.Parse(match.Groups["high"].Value),
                Low = Decimal.Parse(match.Groups["low"].Value),
                Close = Decimal.Parse(match.Groups["close"].Value),
                Volume = long.Parse(match.Groups["volume"].Value),
                Timestamp = FromUnixTime(long.Parse(match.Groups["timestamp"].Value))
            };
            yield return tick;
        }
    }

This gets us to our IEnumerable<T> point. We have data, and we now have mapped it to our own model. Next we need to upgrade the model from IEnumerable<T> to IObservable<T>. This is where we need to start thinking about time and how it applies to the observable sequence.

The playback

In our data-set, we have time information in the form of a time-stamp. This time-stamp represents when the data was recorded. We can use this to schedule the delivery of our events. We could use one of the standard Rx schedulers, but instead we will leverage the virtual time capabilities of the TestScheduler.

In this example we want the first row of data to be published immediately. To do this will will use the time-stamp from the first row as the start time of the sequence. For each row we take the delta between this start time and the row's time-stamp to get the row's offset. We then create an OnNext Notification with the OHLC data, and then wrap that in a Recorded envelope with the time offset. We use the static helper method ReactiveTest.OnNext to do this.


    var ticks = FromYahooFinanceCsv(csvContent).ToArray();
    var startTime = ticks.First().Timestamp;
    var notifications = ticks.Select (t => ReactiveTest.OnNext(t.Timestamp.Ticks-startTime.Ticks, t));

Now we have created an IEnumerable sequence of notifications with time metadata, we can now create the observable sequence. The TestScheduler allows you to create pre-canned observable sequences by providing an array of Recorded<Notification<T>>, which is what we have.


    var scheduler = new TestScheduler();
    var notifications = CreateSequence(scheduler);
    var observableSequence = scheduler.CreateHotObservable(notifications);

Now we have an observable sequence where the values are produced in the virtual time of our TestScheduler. As we move the virtual clock forward, events will be published from the observable sequence to its subscribers.


    var scheduler = new TestScheduler();
    var notifications = CreateSequence(scheduler);
    var observableSequence = scheduler.CreateHotObservable(notifications);
        
    observableSequence.Dump();
    
    scheduler.Start();

In this example we use the TestScheduler.Start(), however this runs the clock at full speed so the query completes in 50ms. This is not exactly what we want. We would prefer the data to be pushed at the same cadence as the metadata time-stamps would suggest. To do this we will need to write a little Rx query to bind virtual time to clock time.

Here we replace the Start call with


    var increment = TimeSpan.FromMilliseconds(100);
    using(Observable.Interval(increment)
        .Subscribe(_=>scheduler.AdvanceBy(increment.Ticks)))
    {
        Console.ReadLine();
    }

Now we have the pleasure of having the sequence unfold at the same time cadence as it did in reality. Now as the data covers an entire trading day, this could take some time.

Replaying data at pace

As the above query would take hours to complete, let's speed up our virtual clock. We can do this simply.


    var speed = 500L;
    var increment = TimeSpan.FromMilliseconds(100);
    using(Observable.Interval(increment)
        .Subscribe(_=>scheduler.AdvanceBy((long)increment.Ticks*speed)))
    {
        Console.ReadLine();
    }

The full .NET code sample is below.


    void Main()
    {
        var scheduler = new TestScheduler();
        var notifications = CreateSequence(scheduler);
        var startTime = notifications.First().Value.Value.Timestamp;    
        var observableSequence = scheduler.CreateHotObservable(notifications);
            
        observableSequence.Dump();
        
        var speed = 500L;
        var increment = TimeSpan.FromMilliseconds(100);
        using(Observable.Interval(increment)
            .Subscribe(_=>scheduler.AdvanceBy((long)increment.Ticks*speed)))
        {
            Console.ReadLine();
        }
    }
    
    // Define other methods and classes here
    public static Recorded<Notification>[] CreateSequence(TestScheduler scheduler)
    {
        var currentDir = Path.GetDirectoryName(LINQPad.Util.CurrentQueryPath);
        var filePath = Path.Combine(currentDir, @"ResourcesYahooFinance_AAPL.csv");
        var csvContent = File.ReadAllText(filePath);
        var ticks = FromYahooFinanceCsv(csvContent).ToArray();
        
        var startTime = ticks.First().Timestamp;
        var endTime = ticks.Last().Timestamp;
        
        var notifications = ticks.Select (t => ReactiveTest.OnNext(t.Timestamp.Ticks-startTime.Ticks, t))
            .Concat(new[]{ReactiveTest.OnCompleted(endTime.Ticks - startTime.Ticks)})
            .ToArray();
        return notifications;
    }
    
    public static IEnumerable FromYahooFinanceCsv(string payload)
    {
        var regex = new Regex(@"^(?d{10}),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+)$", RegexOptions.Multiline);
        var matches = regex.Matches(payload);
        foreach (Match match in matches)
        {
            var tick = new OHLC
            {
                Open=Decimal.Parse(match.Groups["open"].Value),
                High = Decimal.Parse(match.Groups["high"].Value),
                Low = Decimal.Parse(match.Groups["low"].Value),
                Close = Decimal.Parse(match.Groups["close"].Value),
                Volume = long.Parse(match.Groups["volume"].Value),
                Timestamp = FromUnixTime(long.Parse(match.Groups["timestamp"].Value))
            };
            yield return tick;
        }
    }
    
    public static DateTime FromUnixTime(long unixTime)
    {
        var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
        return epoch.AddSeconds(unixTime);
    }
    
    public class OHLC
    {
        public decimal Open { get; set; }
        public decimal High { get; set; }
        public decimal Low { get; set; }
        public decimal Close { get; set; }
        public long Volume { get; set; }
        public DateTimeOffset Timestamp { get; set; }
    }

This is a LinqPad sample but could be adapted to a console application by replacing the .Dump() extension method with


    .Subscribe(
        x=>Console.WriteLine(x), 
        ex=>Console.WriteLine(ex),
        ()=>Console.WriteLine("--Completed--"));`



×

Contact us

By pressing "Send" I agree that I am happy to be contacted by Adaptive Financial Consulting. I can unsubscribe at any time. You can read more about our privacy policy here.

×

I would like to read the full story

By completing the below form I confirm I am happy to be contacted by Adaptive Financial Consulting Ltd. I can unsubscribe at anytime.