in ASP.NET Core Source Code Dive Datadog ~ 8 min read.

Extending the calculator implementation
Creating a Simple Moving Average calculator in C# - Part 3

In this post I show the final implementation of a Simple Moving Average calculator that I wrote for a feature in the Datadog APM tracer. This builds on the examples shown in previous posts to create a version specific to our use case.

I'll start by describing the design requirements and assumptions I made, before showing the full implementation. I then discuss the implementation in more detail.

Background: distributed tracing

Since January I've been working at Datadog on the team building the .NET Application Performance Monitoring (APM) tracer. APM can be used to automatically instrument calls to your web applications, databases, and other services, and to provide distributed tracing without having to modify your application.

I work on the open source .NET tracer library, and for a recent feature I needed to write a Simple Moving Average calculator that would keep track of the number of traces we had to drop due to an error. The details aren't very important for this discussion, but I think it helps to have some context for the following code.

A "trace" describes a complete operation of some sort, with each "step" in that operation described by a "span". A span could describe a database operation or a request to a web service for example, while the trace would be the collection of linked spans that occurred during an incoming web request (for example).

The feature I was working on was to track the number of traces that we send to the backend for processing, compared to the number of traces which fail to send (maybe due to a network issue), or are dropped for some other reason. For this calculation we needed to track the "keep"/"drop" rate as a Simple Moving Average.

The requirements: record a Simple Moving Average keep rate

In the first post in this series, I described a basic Simple Moving Average calculator, and in the previous post I expanded on this to make a thread-safe version by using the Interlocked primitives. Neither of these implementations would quite work for the tracer keep-rate calculator, so in this post I show the final implementation.

I'll start by listing some of the requirements and design considerations:

  • The calculator should keep track of two types of trace "events": keep and drop.
  • For each "event", multiple traces may be kept or dropped at once, but they will always be kept or dropped; we won't drop some and keep others in a single event.
  • We never "remove" events, i.e. the event count is monotonically increasing.
  • The implementation should be thread safe for recording events and for reading the current SMA rate.
  • We should watch out for overflow where appropriate.

In addition, I wanted to stick to the general design I showed in my previous post, where the calculator is responsible for keeping track of how many events have occurred within a given time period:

SMA design

In the next section I show an overview of the implementation I used, and describe how the design considerations above help influence some of the differences from my previous post.

The implementation

I've simplified the implementation in this post somewhat compared to the original, by removing some of the logging and guard clauses, but it's essentially identical to the the original implementation in the .NET tracer on GitHub. I'll start by providing the full implementation, and then show how it all fits together subsequently.

internal class MovingAverageKeepRateCalculator
{
    private readonly int _windowSize;
    private readonly TimeSpan _bucketDuration;
    private readonly uint[] _dropped;
    private readonly uint[] _created;

    private readonly TaskCompletionSource<bool> _processExit = new TaskCompletionSource<bool>();

    private int _index = 0;
    private ulong _sumDrops = 0;
    private ulong _sumCreated = 0;
    private double _keepRate = 0;

    private long _latestDrops = 0;
    private long _latestKeeps = 0;

    internal MovingAverageKeepRateCalculator(int windowSize, TimeSpan bucketDuration)
    {
        _windowSize = windowSize;
        _bucketDuration = bucketDuration;
        _dropped = new uint[windowSize];
        _created = new uint[windowSize];

        Task.Run(UpdateBucketTaskLoopAsync)
            .ContinueWith(t => Console.WriteLine(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
    }

    /// <summary>
    /// Increment the number of kept traces
    /// </summary>
    public void IncrementKeeps(int count)
    {
        if (count < 0)
        {
            throw new ArgumentOutOfRangeException(nameof(count));
        }

        Interlocked.Add(ref _latestKeeps, count);
    }

    /// <summary>
    /// Increment the number of dropped traces
    /// </summary>
    public void IncrementDrops(int count)
    {
        if (count < 0)
        {
            throw new ArgumentOutOfRangeException(nameof(count));
        }

        Interlocked.Add(ref _latestDrops, count);
    }

    /// <summary>
    /// Get the current keep rate for traces
    /// </summary>
    public double GetKeepRate()
    {
        // Essentially Interlock.Read(double)
        return Volatile.Read(ref _keepRate);
    }

    /// <summary>
    /// Stop updating the buckets. The current Keep rate can continue to be read.
    /// </summary>
    public void CancelUpdates()
    {
        _processExit.TrySetResult(true);
    }

    /// <summary>
    /// Update the current rate. Internal for testing only. Should not be called in normal usage.
    /// </summary>
    internal void UpdateBucket()
    {
        var index = _index;
        var previousDropped = _dropped[index];
        var previousCreated = _created[index];

        // Cap at uint.MaxValue events. Very unlikely to reach this value!
        var latestDropped = (uint)Math.Min(Interlocked.Exchange(ref _latestDrops, 0), uint.MaxValue);
        var latestCreated = (uint)Math.Min(Interlocked.Exchange(ref _latestKeeps, 0) + latestDropped, uint.MaxValue);

        var newSumDropped = _sumDrops - previousDropped + latestDropped;
        var newSumCreated = _sumCreated - previousCreated + latestCreated;

        var keepRate = newSumCreated == 0
                            ? 0
                            : 1 - ((double)newSumDropped / newSumCreated);

        Volatile.Write(ref _keepRate, keepRate);

        _sumDrops = newSumDropped;
        _sumCreated = newSumCreated;
        _dropped[index] = latestDropped;
        _created[index] = latestCreated;
        _index = (index + 1) % _windowSize;
    }

    private async Task UpdateBucketTaskLoopAsync()
    {
        while (true)
        {
            if (_processExit.Task.IsCompleted)
            {
                return;
            }

            UpdateBucket();

            await Task.WhenAny(
                Task.Delay(_bucketDuration),
                _processExit.Task)
            .ConfigureAwait(false);
        }
    }
}

Walkthrough the code

Much of this code is similar to previous posts, but I'll walk through each conceptual section in turn.

Keeping track of the current keep and drop count

As in the previous thread-safe implementation, the MovingAverageKeepRateCalculator is responsible for keeping track of the "current" number of events within a time bucket, periodically updating the Simple Moving Average value, and then resetting the "current events" count to 0.

We use the following fields and methods to expose this functionality:

private long _latestDrops = 0;
private long _latestKeeps = 0;

public void IncrementKeeps(int count)
{
    if (count < 0)
    {
        throw new ArgumentOutOfRangeException(nameof(count));
    }

    Interlocked.Add(ref _latestKeeps, count);
}

public void IncrementDrops(int count)
{
    if (count < 0)
    {
        throw new ArgumentOutOfRangeException(nameof(count));
    }

    Interlocked.Add(ref _latestDrops, count);
}

As in the previous post, we use the Interlocked helper methods to ensure that we can safely (atomically) increment these values in a thread-safe manner. It's also worth noting:

  • We don't allow passing a negative value to these methods. This doesn't make sense when describing "events", so we don't allow it.
  • The backing fields, _latestDrops and _latestKeeps are long. Technically, this means if you call these methods fast enough, you could exceed the maximum long value, and have overflow issues. However, as the maximum value is over 9 quintillion (9,223,372,036,854,775,807), overflow is not something we need to worry about 😉 Realistically, int would be fine here too.
  • We only need to increment drops or keeps, not both at once. That means it's fine for us to store the values for each as separate fields, rather than having to update them both together atomically.

The methods above are called by the "event generator" thread, and contribute to the keep and drop count in the "current" time bucket. Periodically (every 1 second for example) we use those values to update the SMA keep-rate value.

Calculating the keep rate

The "update loop" for the calculator is where most of the action happens. This happens in a single thread, separate from the "event generator". The implementation for "run this thread in a loop until cancellation is requested" is the same as for the previous post, so rather than duplicate that, simply refer back to that post. Instead, we'll look at the UpdateBucket() method itself, and other dependant fields.

private readonly int _windowSize;
private readonly TimeSpan _bucketDuration;
private readonly uint[] _dropped;
private readonly uint[] _created;

private int _index = 0;
private ulong _sumDrops = 0;
private ulong _sumCreated = 0;
private double _keepRate = 0;

private long _latestDrops = 0;
private long _latestKeeps = 0;

internal MovingAverageKeepRateCalculator(int windowSize, TimeSpan bucketDuration)
{
    _windowSize = windowSize;
    _bucketDuration = bucketDuration;
    _dropped = new uint[windowSize];
    _created = new uint[windowSize];

    // ...
}

internal void UpdateBucket()
{
    var index = _index;
    var previousDropped = _dropped[index];
    var previousCreated = _created[index];

    // Cap at uint.MaxValue events. Very unlikely to reach this value!
    var latestDropped = (uint)Math.Min(Interlocked.Exchange(ref _latestDrops, 0), uint.MaxValue);
    var latestCreated = (uint)Math.Min(Interlocked.Exchange(ref _latestKeeps, 0) + latestDropped, uint.MaxValue);

    var newSumDropped = _sumDrops - previousDropped + latestDropped;
    var newSumCreated = _sumCreated - previousCreated + latestCreated;

    var keepRate = newSumCreated == 0
                        ? 0
                        : 1 - ((double)newSumDropped / newSumCreated);

    Volatile.Write(ref _keepRate, keepRate);

    _sumDrops = newSumDropped;
    _sumCreated = newSumCreated;
    _dropped[index] = latestDropped;
    _created[index] = latestCreated;
    _index = (index + 1) % _windowSize;
}

Lets start with the constructor which takes two values:

  • bucketDuration: how long to accumulate events before we fix the value and reset it to 0.
  • windowSize: how many time periods should the SMA be calculated over.

From the windowSize parameter (called k in my previous posts), we create arrays to hold the previous accumulated values for both drop rate and create rate. I used uint values, on the basis that it's very unlikely we'll have to deal with more than 4 billion events in a time period of 1 second!

Note that we're storing a "created" rate here rather than a "kept" rate. That's because our SMA for the keep rate is given as:

var keepRate = 1 - (dropCount) / (createdCount);

and createdCount = dropCount + keepCount.

The UpdateBucket() method itself is, again, very similar to the approach in my previous post, with the exception that we're now working with two counts instead of just one.

We start by reading the previousDropped and previousCreated values, which are the values that we are going to replace with our new "current" values. We then grab the latestDrops and latestKeeps values from the fields, replacing the values with 0 using the same Interlocked.Exchange() methods we did in the last post. For example, for the _latestDrops field we use:

Interlocked.Exchange(ref _latestDrops, 0)

Now, the _latestDrops field is a long, whereas the _dropped array uses uint values. long.MaxValue is much larger than uint.MaxValue, so we also use Math.Min(), to "cap" the value at uint.MaxValue. In practice, it's very unlikely the drop count would exceed this, but if it somehow it did, then we'd get an exception casting the long to uint, so we're playing it safe:

long rawLatestDropped = Interlocked.Exchange(ref _latestDrops, 0);
uint latestDropped = (uint)Math.Min(rawLatestDropped, uint.MaxValue);

We fetch the _latestKeeps value in a similar way, and use it to calculate the latestCreated by adding latestDropped and using Math.Min() again.

Once we have these values, we can calculate the "sum" part of the SMA for drop count by taking the current sum, _sumDrops, subtracting the "old" value that is falling out of the time window, previousDropped, and adding the new value latestDropped.

var newSumDropped = _sumDrops - previousDropped + latestDropped;

Finally, we calculate the "keep rate"—the proportion of traces that were kept out of all the traces that were created. If we haven't created any traces, then this rate doesn't make sense, so we just return 0.

double keepRate = newSumCreated == 0 
                  ? 0 
                  : 1 - ((double)newSumDropped / newSumCreated);

Note that, unlike my previous posts, we're calculating a "drop/keep" rate here, so we don't need to divide by the window size, as that cancels out in the calculation.

We now have the keepRate variable, and we need to update the _keepRate field in an atomic, thread-safe way. In the previous post I used the Interlocked classes to write a long field, while in this implementation, I'm using the Volatile class, but the result is essentially the same—we write the _keepRate field in a thread-safe atomic way.

Volatile.Write(ref _keepRate, keepRate);

Interlocked vs Volatile vs volatile is an interesting issue that I'll dig into in a separate post!

With the _keepRate field updated, we now update all our other fields, such as _sumDrops, _dropped[index], and _index. Note that we don't have to worry about making these updates thread-safe, as we only access these values from the update thread.

Reading the keep rate

All that remains is for the "reporter" thread to read the keep rate from the _keepRate field in a thread safe way. We do that in the GetKeepRate() method using Volatile.Read() to read the rate using a thread-safe, atomic approach.

public double GetKeepRate()
{
    return Volatile.Read(ref _keepRate);
}

And that's it, we've finished the implementation of the keep-rate Simple Moving Average calculator. As a reminder, you can see the full implementation on GitHub

Summary

In this post I expanded on the thread-safe calculator from the previous post. I described some of the design constraints I was working with for a recent project, and how they affect the design. Finally, I walked through the implementation of applying a Simple Moving Average to the keep-rate of distributed traces.

Loading comments powered by Disqus, please wait…
Andrew Lock | .Net Escapades

Stay up to the date with the latest posts!

Oops! Check your details and try again.
Thanks! Check your email for confirmation.