This post is part of a series on Channel in C# .NET. Of course, it’s always better to start at Part 1, but you can skip anywhere you’d like using the links below.
Part 1 – Getting Started
Part 2 – Advanced Channels
Part 3 – Understanding Back Pressure
In our previous post we looked at some dead simple examples of how Channels worked, and we saw some pretty nifty features, but for the most part it was pretty similar to any other XQueue implementation. So let’s dive into some more advanced topics. Well.. I say advanced but so much of this is dead simple. This might read like a bit of a feature run through but there is a lot to love!
Separation Of Read/Write Concerns
If you’ve ever shared a Queue between two classes, you’ll know that either class can read/write, even if they aren’t supposed to. For example :
class MyProducer { private readonly Queue<int> _queue; public MyProducer(Queue<int> queue) { _queue = queue; } } class MyConsumer { private readonly Queue<int> _queue; public MyConsumer(Queue<int> queue) { _queue = queue; } }
So while a Producer is supposed to only write to the queue, and a Consumer is supposed to only read, in both cases they can do all operations on the queue. While you might in your own head want the Consumer to only read, another developer might come along and quite happily start calling Enqueue and there’s nothing but a code review to stop them making that mistake.
But with Channels, we can do things differently.
class Program { static async Task Main(string[] args) { var myChannel = Channel.CreateUnbounded<int>(); var producer = new MyProducer(myChannel.Writer); var consumer = new MyConsumer(myChannel.Reader); } } class MyProducer { private readonly ChannelWriter<int> _channelWriter; public MyProducer(ChannelWriter<int> channelWriter) { _channelWriter = channelWriter; } } class MyConsumer { private readonly ChannelReader<int> _channelReader; public MyConsumer(ChannelReader<int> channelReader) { _channelReader = channelReader; } }
In this example I’ve added a main method to show you how the creation of the writer/reader happen, but it’s dead simple. So here we can see that for our Producer, I’ve passed it only a ChannelWriter, so it can only do write operations. And for our Consumer, we’ve passed it a ChannelReader so it can only read.
Of course it doesn’t mean that another developer can’t just modify the code and start injecting the root Channel object, or passing in both the ChannelWriter/ChannelReader, but it atleast outlays much better what the intention of the code is.
Completing A Channel
We saw earlier that when we call ReadAsync() on a channel, it will actually sit there waiting for messages, but what if there isn’t any more messages coming? Maybe this is a one time batch job and the batch is completed. Normally with other Queues in .NET, we would have to have some sort of shared boolean and/or a CancellationToken be passed around. But with Channels, it’s even easier.
Consider the following :
static async Task Main(string[] args) { var myChannel = Channel.CreateUnbounded<int>(); _ = Task.Factory.StartNew(async () => { for (int i = 0; i < 10; i++) { await myChannel.Writer.WriteAsync(i); } myChannel.Writer.Complete(); }); try { while (true) { var item = await myChannel.Reader.ReadAsync(); Console.WriteLine(item); await Task.Delay(1000); } }catch(ChannelClosedException e) { Console.WriteLine("Channel was closed!"); } }
I’ve made it so that our second thread writes to our channel as fast as possible, then completes it. Then our reader slowly reads with a delay of 1 second between reads. Notice that we catch the ChannelClosedExecption, this is called when you try and read from the closed channel *after* the final message.
I just want to make that clear. Calling Complete() on a channel does not immediately close the channel and kill everyone reading from it. It’s instead a way to say to notify any readers that once the last message is read, we’re done. That’s important because it means it doesn’t matter if the Complete() is called while we are waiting for new items, while the queue is empty, while it’s full etc. We can be sure that we will complete all available work then finish up.
Using IAsyncEnumerable With Channels
If we take our example when we try and close a channel, there are two things that stick out to me.
- We have a while(true) loop. And this isn’t really that bad, but it’s a bit of an eyesore.
- To break out of this loop, and to know that the channel is completed, we have to catch an exception and essentially swallow it.
These problems are solved using the command “ReadAllAsync()” that returns an IAsyncEnumerable (A bit more on how IAsyncEnumerable works right here). The code looks a bit like so :
static async Task Main(string[] args) { var myChannel = Channel.CreateUnbounded<int>(); _ = Task.Factory.StartNew(async () => { for (int i = 0; i < 10; i++) { await myChannel.Writer.WriteAsync(i); } myChannel.Writer.Complete(); }); await foreach(var item in myChannel.Reader.ReadAllAsync()) { Console.WriteLine(item); await Task.Delay(1000); } }
Now the code reads a lot better and removes some of the extra gunk around catching the exception. Because we are using an IAsyncEnumerable, we can still wait on each item like we previously did, but we no longer have to catch an exception because when the channel completes, it simply says it has nothing more and the loop exits.
Again, this gets rid of some of the messy code you used to have to write when dealing with queues. Where previously you had to write some sort of infinite loop with a breakout clause, now it’s just a real tidy loop that handles everything under the hood.
What’s Next
So far, we’ve been using “Unbounded” channels. And as you’ve probably guessed, of course there is an option to use a BoundedChannel instead. But what is this? And how does the term “back pressure” relate to it? Check out the next part of this series on better understanding of back pressure.
This is really interesting… I really like that it’s thread-safe. I’ll have to play around with this to really see how it works. I do a lot of Kafka related work so this is appealing.