Here at Adaptive, we get asked fairly regularly how would we replay historical data with Rx. The short answer is we would use virtual time via the Rx TestScheduler
.
Generally, that answer is met with a blank stare. So to give a more detailed response, I thought I would show you how you can do this at home.
This post assumes some knowledge of Rx and the IObservable<T>
and IEnumerable<T>
types. For readers not familiar with .NET, the IEnumerable<T>
type can be considered analogous to the Iterable interface in Java.
Use cases
There are several use case for developers wanting to be able to replay time series data.
Some include:
- Developing with local test data (on disk) instead of integrating with another system.
- Debugging a failure from Prod.
- Performing various stress and performance tests with known data.
- Demonstrating a product in a lifelike manner e.g. demoing a prototype to a prospective client, presenting at a conference
Quick wins
If you want to just get the data and process it, then you can do this in a conventional way. You would read from disk the data and process one row/event at a time. However, this would just give you the final result of the processing. Often what people are looking for is to see each state change along the way, just as their application would process it if the data was real time, not just replayed.
So instead of seeing the end of day chart, they may want to see the chart unfold as it would with the day's events. Another scenario might be to see the effect of an Order Book being filled with orders and being processed by a matching engine. These are always more interesting with some visual candy to animate the experience.
The problem space
Initially, the problem seems to be getting data that is IEnumerable<T>
and making it appear to be IObservable<T>
data. However, this is overly simplistic. If that was the sum of the problem, then just using .ToObservable()
would be a suitable solution.
The real solution would need to ensure that data is replayed at a cadence that reflects that of the original data sequence. If you were to use the .ToObservable()
extension method then all the data would be pushed to the subscriber as fast as possible, ignoring the periods of silence in the original sequence. Instead, we want to be able to replicate not only the data from the sequence but the timeliness of the delivery of that data.
What is required
The data
First and foremost we will need to have some data. Not only do we need the actual event we would like to replay, but we also need to to have some time-stamp metadata. This information can either be the actual time the data was published, or it can be the period since the last notification. Either will work.
Once you have the data, you will want to parse that data into your model. If the time-stamp data is not part of your model, then you can add it as metadata with either the Reactive.Timestamped<T>
or Reactive.TimeInterval<T>
envelope types.
For our example we are going to source data from Yahoo. Using their chart API, we get the last day of Apple stock prices via this URI. This returns us data like the following:
uri:/instrument/1.0/AAPL/chartdata;type=quote;range=1d/csv
ticker:aapl
Company-Name:Apple Inc.
Exchange-Name:NMS
unit:MIN
timezone:EDT
currency:USD
gmtoffset:-14400
previous_close:126.0000
Timestamp:1436275800,1436299200
labels:1436277600,1436281200,1436284800,1436288400,1436292000,1436295600,1436299200
values:Timestamp,close,high,low,open,volume
close:123.8600,126.0600
high:123.9100,126.1500
low:123.7700,125.8900
open:123.8600,126.0700
volume:0,1523700
1436275859,125.8001,126.0000,125.8000,125.9600,566200
1436275860,125.8700,125.9900,125.7700,125.8000,193100
1436275979,125.9220,125.9600,125.8100,125.8800,132000
1436276039,126.0600,126.0700,125.8900,125.9200,201000
1436276099,125.8311,126.1500,125.8200,126.0700,249400
1436276101,125.8500,125.9000,125.7800,125.8300,142200
The parser
We then parse the data in a rather simplistic way using Regex pattern matching.
^(?d{10}),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+)$
Ah regex, gotta love them. Right?
Next we can just enumerate through all of the matching lines. This also has the benefit of automatically excluding the header information rows. We can then simply map this to our model.
public static IEnumerable FromYahooFinanceCsv(string payload)
{
var regex = new Regex(@"^(?d{10}),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+)$", RegexOptions.Multiline);
var matches = regex.Matches(payload);
foreach (Match match in matches)
{
var tick = new OHLC
{
Open=Decimal.Parse(match.Groups["open"].Value),
High = Decimal.Parse(match.Groups["high"].Value),
Low = Decimal.Parse(match.Groups["low"].Value),
Close = Decimal.Parse(match.Groups["close"].Value),
Volume = long.Parse(match.Groups["volume"].Value),
Timestamp = FromUnixTime(long.Parse(match.Groups["timestamp"].Value))
};
yield return tick;
}
}
This gets us to our IEnumerable<T>
point. We have data, and we now have mapped it to our own model. Next we need to upgrade the model from IEnumerable<T>
to IObservable<T>
. This is where we need to start thinking about time and how it applies to the observable sequence.
The playback
In our data-set, we have time information in the form of a time-stamp. This time-stamp represents when the data was recorded. We can use this to schedule the delivery of our events. We could use one of the standard Rx schedulers, but instead we will leverage the virtual time capabilities of the TestScheduler
.
In this example we want the first row of data to be published immediately. To do this will will use the time-stamp from the first row as the start time of the sequence. For each row we take the delta between this start time and the row's time-stamp to get the row's offset. We then create an OnNext
Notification
with the OHLC
data, and then wrap that in a Recorded
envelope with the time offset. We use the static helper method ReactiveTest.OnNext
to do this.
var ticks = FromYahooFinanceCsv(csvContent).ToArray();
var startTime = ticks.First().Timestamp;
var notifications = ticks.Select (t => ReactiveTest.OnNext(t.Timestamp.Ticks-startTime.Ticks, t));
Now we have created an IEnumerable
sequence of notifications with time metadata, we can now create the observable sequence. The TestScheduler
allows you to create pre-canned observable sequences by providing an array of Recorded<Notification<T>>
, which is what we have.
var scheduler = new TestScheduler();
var notifications = CreateSequence(scheduler);
var observableSequence = scheduler.CreateHotObservable(notifications);
Now we have an observable sequence where the values are produced in the virtual time of our TestScheduler
. As we move the virtual clock forward, events will be published from the observable sequence to its subscribers.
var scheduler = new TestScheduler();
var notifications = CreateSequence(scheduler);
var observableSequence = scheduler.CreateHotObservable(notifications);
observableSequence.Dump();
scheduler.Start();
In this example we use the TestScheduler.Start()
, however this runs the clock at full speed so the query completes in 50ms. This is not exactly what we want. We would prefer the data to be pushed at the same cadence as the metadata time-stamps would suggest. To do this we will need to write a little Rx query to bind virtual time to clock time.
Here we replace the Start call with
var increment = TimeSpan.FromMilliseconds(100);
using(Observable.Interval(increment)
.Subscribe(_=>scheduler.AdvanceBy(increment.Ticks)))
{
Console.ReadLine();
}
Now we have the pleasure of having the sequence unfold at the same time cadence as it did in reality. Now as the data covers an entire trading day, this could take some time.
Replaying data at pace
As the above query would take hours to complete, let's speed up our virtual clock. We can do this simply.
var speed = 500L;
var increment = TimeSpan.FromMilliseconds(100);
using(Observable.Interval(increment)
.Subscribe(_=>scheduler.AdvanceBy((long)increment.Ticks*speed)))
{
Console.ReadLine();
}
The full .NET code sample is below.
void Main()
{
var scheduler = new TestScheduler();
var notifications = CreateSequence(scheduler);
var startTime = notifications.First().Value.Value.Timestamp;
var observableSequence = scheduler.CreateHotObservable(notifications);
observableSequence.Dump();
var speed = 500L;
var increment = TimeSpan.FromMilliseconds(100);
using(Observable.Interval(increment)
.Subscribe(_=>scheduler.AdvanceBy((long)increment.Ticks*speed)))
{
Console.ReadLine();
}
}
// Define other methods and classes here
public static Recorded<Notification>[] CreateSequence(TestScheduler scheduler)
{
var currentDir = Path.GetDirectoryName(LINQPad.Util.CurrentQueryPath);
var filePath = Path.Combine(currentDir, @"ResourcesYahooFinance_AAPL.csv");
var csvContent = File.ReadAllText(filePath);
var ticks = FromYahooFinanceCsv(csvContent).ToArray();
var startTime = ticks.First().Timestamp;
var endTime = ticks.Last().Timestamp;
var notifications = ticks.Select (t => ReactiveTest.OnNext(t.Timestamp.Ticks-startTime.Ticks, t))
.Concat(new[]{ReactiveTest.OnCompleted(endTime.Ticks - startTime.Ticks)})
.ToArray();
return notifications;
}
public static IEnumerable FromYahooFinanceCsv(string payload)
{
var regex = new Regex(@"^(?d{10}),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+(.d+)),(?d+)$", RegexOptions.Multiline);
var matches = regex.Matches(payload);
foreach (Match match in matches)
{
var tick = new OHLC
{
Open=Decimal.Parse(match.Groups["open"].Value),
High = Decimal.Parse(match.Groups["high"].Value),
Low = Decimal.Parse(match.Groups["low"].Value),
Close = Decimal.Parse(match.Groups["close"].Value),
Volume = long.Parse(match.Groups["volume"].Value),
Timestamp = FromUnixTime(long.Parse(match.Groups["timestamp"].Value))
};
yield return tick;
}
}
public static DateTime FromUnixTime(long unixTime)
{
var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
return epoch.AddSeconds(unixTime);
}
public class OHLC
{
public decimal Open { get; set; }
public decimal High { get; set; }
public decimal Low { get; set; }
public decimal Close { get; set; }
public long Volume { get; set; }
public DateTimeOffset Timestamp { get; set; }
}
This is a LinqPad sample but could be adapted to a console application by replacing the .Dump()
extension method with
.Subscribe(
x=>Console.WriteLine(x),
ex=>Console.WriteLine(ex),
()=>Console.WriteLine("--Completed--"));`