Collection of utility extensions for async streams
With the introduction of async streams in .NET Core 3, represented by the interface IAsyncEnumerable<T>, and with a direct support in C# 8 to iterate using await foreach or easily implement a new asynchronous stream by defining async IAsyncEnumerable<T> as the method result and using yield return/yield break just like we did for IEnumerable<T>, Microsoft standardized the way .NET developers implement asynchronous streams.
Even if we don’t realize, we probably use async streams on a daily basis, from Entity Framework Core to ASP.NET Core, it has become an important part of .NET that is now widely adopted. In this article I’m going to show some the most common scenarios I usually face when working directly with IAsyncEnumerable<T> and how I usually solve them.
Utility extensions for async streams
When implementing an extension for an async stream there are a few guidelines to keep in mind.
Firstly, don’t validate function inputs (like null checks) inside the async method, but create a wrapper method instead that validates and then calls an internal implementation. This will keep the stack trace more clean, easy to analyze and lightweight.
Secondly, your code will surely need a CancellationToken but don’t define it in the public method signature even if it has a default value. To make developers life easier, instead of passing a CancellationToken to every method, Microsoft provides the extension method WithCancellation that can be used on any IAsyncEnumerable and if you use the attribute EnumeratorCancellation in your methods signature, the compiler will automatically use the cancellation token passed to the WithCancellation method to all your methods. If you define it as a parameter in your public signature it will cause confusion to the developer using it.
Imagine you were creating a simple Where extension to filter an async stream, to follow the guidelines it would be implemented as follows:
As you can see, the validations are done outside the actual implementation just like the CancellationToken is only defined internally.
I recommend giving a good look to the source code of System.Linq.Async, the officially supported LINQ extensions for async streams.
Let’s now drill down to some of my most used utility extensions.
Timeout between fetches
When receiving data from an IAsyncEnumerable we may not know how many items and how long in total it is going to take, but we may want to enforce a maximum timeout between receiving each item to ensure the application doesn’t wait indefinitely.
One very common scenario nowadays is the integration with Large Language Models (LLMs) APIs that use Server-Sent Events (SSE) to stream response tokens to be shown to the user in real time. Even with a loading somewhere, if some token response takes too long the user will probably think the application just stopped working.
The following code will link a CancellationTokenSource to the original CancellationToken and use it’s internal timer to automatically cancel after a given time as passed, throwing a TaskCanceledException when waiting for the next item.
awaitforeach (var element in source.WithCancellation(ct)) { // disable the timer while the item is being processed cts.CancelAfter(-1);
yieldreturn element;
// re-enable the timer before fetching the next item cts.CancelAfter(millisecondsTimeout); } } }
I like this approach because it’s simple and, except for an extra allocation of a CancellationTokenSource and it’s internal Timer (which will be reused per item), there’s nothing much to be said making it a very efficient implementation. It can be use just like any LINQ method:
If you are like me an prefer to receive a TimeoutException, you can change the implementation by manually iterating over the enumerator instead of using await foreach and if a TaskCanceledException is thrown but the received CancellationToken isn’t cancelled, you can assume it was due to a timeout and throw a TimeoutException instead.
while (await MoveNextCheckTimeoutAsync(enumerator, ct)) { cts.CancelAfter(-1);
yieldreturn enumerator.Current;
cts.CancelAfter(millisecondsTimeout); } }
staticasync ValueTask<bool> MoveNextCheckTimeoutAsync( IAsyncEnumerator<T> enumerator, CancellationToken ct ) { try { returnawait enumerator.MoveNextAsync(); } catch (TaskCanceledException e) when (!ct.IsCancellationRequested) { thrownew TimeoutException("The next item took longer than expected to be received", e); } } }
Batch with maximum waiting time
When receiving data from an IAsyncEnumerable to be persisted is some database, to reduce and optimize the insertion it may be a good idea to batch it instead of store item by item.
Imagine you are storing data into some relational table, if you receive 500 items, that’s 500 database accesses which may affect the performance of your overall application. If instead you create batches of 10 items it will mean 50 database accesses with more time in between, certainly reducing the overall database load.
If we use MoreLINQBatch method as an example, we can easily convert it to an IAsyncEnumerable implementation.
Another common scenario when batching items is to support a timeout parameter that will either return a complete batch or, after a given time has passed, it will return whatever items have already been received.
This can be easily implemented by calculating the a timeout date and check if it has passed every time a item is received. If the given amount of time has passed, you simply resize the collection to the current size, return that batch and calculate again the next timeout date.
publicstaticIAsyncEnumerable<T[]> Batch<T>( this IAsyncEnumerable<T> source, int size, int millisecondsTimeout ) { ArgumentNullException.ThrowIfNull(source); ArgumentOutOfRangeException.ThrowIfNegativeOrZero(size); ArgumentOutOfRangeException.ThrowIfNegative(millisecondsTimeout);
return Core(source, size, millisecondsTimeout);
staticasync IAsyncEnumerable<T[]> Core( IAsyncEnumerable<T> source, int size, int millisecondsTimeout, [EnumeratorCancellation] CancellationToken ct = default ) { T[] batch = null; var count = 0; var timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsTimeout); awaitforeach (var item in source.WithCancellation(ct)) { batch ??= new T[size]; batch[count++] = item;
if (count != size) { if (timeoutOn > DateTime.UtcNow) continue;
I like this approach because it is very simple to understand and efficient since it only allocates an extra DateTime over the initial batch method. The only downside is that it only timeouts after an item is received not while waiting. To solve that limitation, the implementation would require a timer and constant allocations of TaskCompletitionSource which, at least for my use cases, it isn’t worth the complexity and performance overload.
Again, this method can easily be used just like other LINQ extensions:
One last scenario I usually face is to apply some throttling to prevent overloading the application in case of a high throughput of items received from IAsyncEnumerable. It usually combines very well with the timeout Batch method to enforce a predictive cadence of data.
The implementation is very simple: you calculate the timeout date, fetch the next item, and if not enough time has passed, you just do a delay for the remaining time and repeat the process per each item.
In this article I’ve shown some of the most common utility methods I use when working with IAsyncEnumerable, allowing to either implement timeouts while waiting for the next item, batching with a maximum wait time or enforcing throttling to prevent application overload.
Feel free to change them in ways that make sense to you, like creating overloads that receive a TimeSpan, or use them as an example for creating your own utility methods.
Here’s the full code sample so you can put it in your own projects.
publicstaticIAsyncEnumerable<T[]> Batch<T>( this IAsyncEnumerable<T> source, int size, int millisecondsTimeout ) { ArgumentNullException.ThrowIfNull(source); ArgumentOutOfRangeException.ThrowIfNegativeOrZero(size); ArgumentOutOfRangeException.ThrowIfNegative(millisecondsTimeout);
return Core(source, size, millisecondsTimeout);
staticasync IAsyncEnumerable<T[]> Core( IAsyncEnumerable<T> source, int size, int millisecondsTimeout, [EnumeratorCancellation] CancellationToken ct = default ) { T[] batch = null; var count = 0; var timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsTimeout); awaitforeach (var item in source.WithCancellation(ct)) { batch ??= new T[size]; batch[count++] = item;
if (count != size) { if (timeoutOn > DateTime.UtcNow) continue;