Implementing A Leaky Bucket Client In .NET Core

Rate limiting web services and APIs is nothing new, but I would say over the past couple of years the “leaky bucket” strategy of rate limiting has rised in popularity to the point where it’s almost a defacto these days. The leaky bucket strategy is actually quite simple – made even easier by the fact that it’s name comes from a very physical and real world scenario.

What Is A Leaky Bucket Rate Limit?

Imagine a 4 litre physical bucket that has a hole in it. The hole leaks out 1 litre of water every minute. Now you can dump 4 litres of water in the bucket all at once just fine, but still it will flow out at 1 litre per minute until it’s empty. If you did fill it immediately, after 1 minute, you could make 1 request but then have to wait another minute. Or you could wait 2 minutes and make 2 at once etc.  The general idea behind using a leaky bucket scenario over something like “1 call per second” is that it allows you to “burst” through calls until the bucket is full, and then wait a period of time until the bucket drains. However even while the bucket is draining you can trickle in calls if required.

I generally see it applied on API’s that to complete a full operation may take 2 or 3 API calls, so limiting to 1 per second is pointless. But allowing a caller to burst through enough calls to complete their operation, then back off, is maybe a bit more realistic.

Implementing The Client

This article is going to talk about a leaky bucket “client”. So I’m the one calling a web service that has a leaky bucket rate limitation in place. In a subsequent article we will look at how we implement this on the server end.

Now I’m the first to admit, this is unlikely to win any awards and I typically shoot myself in the foot with threadsafe coding, but here’s my attempt at a leaky bucket client that for my purposes worked a treat, but it was just for a small personal project so your mileage may vary.

class LeakyBucket
{
    private readonly BucketConfiguration _bucketConfiguration;
    private readonly ConcurrentQueue<DateTime> currentItems;
    private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

    private Task leakTask;

    public LeakyBucket(BucketConfiguration bucketConfiguration)
    {
        _bucketConfiguration = bucketConfiguration;
        currentItems = new ConcurrentQueue<DateTime>();
    }

    public async Task GainAccess(TimeSpan? maxWait = null)
    {
        //Only allow one thread at a time in. 
        await semaphore.WaitAsync(maxWait ?? TimeSpan.FromHours(1));
        try
        {
            //If this is the first time, kick off our thread to monitor the bucket. 
            if(leakTask == null)
            {
                leakTask = Task.Factory.StartNew(Leak);
            }

            while (true)
            {
                if (currentItems.Count >= _bucketConfiguration.MaxFill)
                {
                    await Task.Delay(1000);
                    continue;
                }

                currentItems.Enqueue(DateTime.UtcNow);
                return;
            }
        }finally
        {
            semaphore.Release();
        }
                
    }

    //Infinite loop to keep leaking. 
    private void Leak()
    {
        //Wait for our first queue item. 
        while(currentItems.Count == 0)
        {
            Thread.Sleep(1000);
        }

        while(true)
        {
            Thread.Sleep(_bucketConfiguration.LeakRateTimeSpan);
            for(int i=0; i < currentItems.Count && i < _bucketConfiguration.LeakRate; i++)
            {
                DateTime dequeueItem;
                currentItems.TryDequeue(out dequeueItem);
            }
        }
    }
}

class BucketConfiguration
{
    public int MaxFill { get; set; }
    public TimeSpan LeakRateTimeSpan { get; set; }
    public int LeakRate { get; set; }
}

There is a bit to unpack there but I’ll do my best.

  • We have a class called BucketConfiguration which specifies how full the bucket can get, and how much it leaks.
  • Our main method is called “GainAccess” and this will be called each time we want to send a request.
  • We use a SemaphoreSlim just incase this is used in a threaded scenario so that we queue up calls and not get tangled up in our own mess
  • On the first call to gain access, we kick off a thread that is used to “empty” the bucket as we go.
  • Then we enter in a loop. If the current items on the queue is the max fill, then we just wait a second and try again.
  • When there is room on the queue, we pop our time on and return, thus gaining access.

Now I’ve used a queue here but you really don’t need to, it’s just helpful debugging which calls we are leaking etc. But really a blockingcollection or something similar is just as fine. Notice that we also kick off a thread to do our leaking. Because it’s done at a constant rate, we need a dedicated thread to be “dripping” out requests.

And finally, everything is async including our semaphore (If you are wondering why I didn’t just use the *lock* keyword, it can’t be used with async code). This means that we hopefully don’t jam up threads waiting to send requests. It’s not foolproof of course, but it’s better than hogging threads when we are essentially spinwaiting.

The Client In Action

I wrote a quick console application to show things in action. So for example :

static async Task Main(string[] args)
{
    LeakyBucket leakyBucket = new LeakyBucket(new BucketConfiguration
    {
        LeakRate = 1, 
        LeakRateTimeSpan = TimeSpan.FromSeconds(5), 
        MaxFill = 4
    });

    while (true)
    {
        await leakyBucket.GainAccess();
        Console.WriteLine("Hello World! " + DateTime.Now);
    }
}

Running this we get :

Hello World! 24/11/2019 5:08:26 PM
Hello World! 24/11/2019 5:08:26 PM
Hello World! 24/11/2019 5:08:26 PM
Hello World! 24/11/2019 5:08:26 PM
Hello World! 24/11/2019 5:08:31 PM
Hello World! 24/11/2019 5:08:36 PM
Hello World! 24/11/2019 5:08:41 PM
Hello World! 24/11/2019 5:08:46 PM

Makes sense. We do our run of 4 calls immediately, but then we have to back off to doing just 1 call every 5 seconds.

That’s pretty simple, but we can also handle complex scenarios such as leaking 2 requests instead of 1 every 5 seconds. We change our leaky bucket to :

LeakyBucket leakyBucket = new LeakyBucket(new BucketConfiguration
{
    LeakRate = 2, 
    LeakRateTimeSpan = TimeSpan.FromSeconds(5), 
    MaxFill = 4
});

And what do you know! We see our burst of 4 calls, then every 5 seconds we see us drop in another 2 at once.

Hello World! 24/11/2019 5:10:09 PM
Hello World! 24/11/2019 5:10:09 PM
Hello World! 24/11/2019 5:10:09 PM
Hello World! 24/11/2019 5:10:09 PM
Hello World! 24/11/2019 5:10:14 PM
Hello World! 24/11/2019 5:10:14 PM
Hello World! 24/11/2019 5:10:19 PM
Hello World! 24/11/2019 5:10:19 PM
Hello World! 24/11/2019 5:10:24 PM
Hello World! 24/11/2019 5:10:24 PM

2 thoughts on “Implementing A Leaky Bucket Client In .NET Core”

    • That particular Thread.Sleep is inside the background task, so it only blocks the background task and not the web thread. Although I would definitely say I’m using this inside a console application and not a web app. It will still work, but you would have to be careful about the lifetimes of the bucket etc, likely making it a singleton to be injected.

      Reply

Leave a Comment