Getting Started with Async Enumerables: A .NET Developer's Guide

AsyncEnumerables enhances eumeration of collections with asynchronous capabilities. In this post, let's explore how C# combines 'yield return' with 'async' and 'await' to create efficient asynchronous data streams and how 'await foreach' lets us effortlessly consume them.

Rahul Pulikkot Nath
Rahul Pulikkot Nath

Table of Contents

Enumeration of collections has been the bread-and-butter of many programs.

But what if we could enhance this familiar concept with asynchronous capabilities?

That's precisely what .NET Async Enumerable does.

In this post, let's explore how C# combines 'yield return' with 'async' and 'await' to create efficient asynchronous data streams and how 'await foreach' lets us effortlessly consume them.

For the demo, I will show you a simple console application that looks through rating files stored in an Amazon S3 bucket to find the first n number of x-star ratings for a given product.

We will examine some of the challenges of standard approaches and then learn how to use AsyncEnumerable to make this more efficient.

While we're using S3 here, AsyncEnumerables aren't limited to AWS. They're helpful for efficiently processing streams of data from any source—databases, files, APIs, or real-time feeds.

AWS sponsors this post.

Asynchronous Stream without IAsyncEnumerable

Let's explore our use case for getting all first-x product ratings from the S3 bucket a bit further and see it in action.

Amazon S3 For the .NET Developer: How to Easily Get Started
Learn how to get started with Amazon S3 from a .NET Core application. We will learn how to store and retrieve data from the storage, important concepts to be aware of while using S3.

The below code uses the GetAllFileS3Objects method to loop through the Amazon S3 bucket using the AWSSDK.S3 Nuget package.

Since the S3 SDK returns paged results of objects, the function builds up an internal list of object files before returning the entire collection.

var ratings = await GetAllRatings(client);

var filteredRatings = ratings
    .Where(r => r.Rating == 3)
    .Take(50)
    .Select(r => r.Review);
    
Console.WriteLine(string.Join(Environment.NewLine, filteredRatings));

async Task<List<ProductRating>> GetAllRatings(AmazonS3Client s3Client)
{
    var s3Objects = await GetAllFileS3Objects(s3Client);
    var productRatings = new List<ProductRating>();
    var fileReadCounter = 0;
    foreach (var s3Object in s3Objects)
    {
       var rating = await GetRating(s3Client, s3Object);
       productRatings.Add(rating);
       Console.WriteLine("File {0} read {1}", s3Object.Key, ++fileReadCounter);
    }

    return productRatings;
}

async Task<IEnumerable<S3Object>> GetAllFileS3Objects(AmazonS3Client amazonS3Client)
{
    ListObjectsV2Response? s3ObjectsResponse = null;
    var result = new List<S3Object>();
    do
    {
        s3ObjectsResponse = await amazonS3Client.ListObjectsV2Async(new ListObjectsV2Request()
        {
            BucketName = bucketName,
            ContinuationToken = s3ObjectsResponse?.NextContinuationToken,
            StartAfter = s3ObjectsResponse?.StartAfter
        });


        result.AddRange(s3ObjectsResponse.S3Objects);
        Console.WriteLine("Total Objects retrieved {0}", result.Count);
    } while (s3ObjectsResponse.IsTruncated);

    return result;
}

Iterate over all objects in Amazon S3, build up the whole list of objects before starting any processing over it.

The GetAllRatings method gets the entire list of objects from S3 bucket into the server memory and loops through each of them to retrieve the actual file from S3, reads its JSON contents, and parses it into a Rating object.

The main function in Program.cs then filters this final Ratings list by the objects that have 3-star ratings, takes 50 of them, and prints it to the console.

⚠️
Looping through paged data results over I/O operations is one scenario where IAsyncEnumerable can help.

Problems with the Current Code

Before proceeding, consider whether you can think some of the high-level problems with the above code.

Now that you've given it some thought, let's discuss:

  • Entire list in memory: We need to build the entire list of S3 objects in memory before starting to process them.
  • Time and resource consumption: Depending on the number and size of the objects in the bucket, this will consume significant time and server resources.
  • Potential memory issues: If the objects are large, you could potentially run into memory issues, which might even bring down the entire application.
  • Unnecessary reads: We only need the first 50 objects that match the criteria. All objects beyond that are read unnecessarily and discarded.

Introducing Async Enumerable

Let's see how we can fix some issues by introducing AsyncEnumerables.

💡
Async enumerable is a way to work with collections of data that arrive over time, rather than all at once.

AsyncEnumerables are particularly useful when:

  1. You're dealing with large datasets
  2. Data is being generated or fetched in real-time
  3. You want to process items as they become available without waiting for the entire collection

AsyncEnumerables generates the enumerable asynchronously as you are iterating over them.

So, instead of doing all the I/O work upfront, it does it just when it's needed. As you loop through the items, there might be a slight delay in getting the next item(s), depending on whether it needs to be fetched over I/O.

The below code now implements IAsyncEnumerable for out GetAllFilesS3Objects method, which returns all the objects from the S3 bucket.

async IAsyncEnumerable<S3Object> GetAllFileS3Objects(AmazonS3Client amazonS3Client)
{
    ListObjectsV2Response? s3ObjectsResponse = null;
    do
    {
        s3ObjectsResponse = await amazonS3Client.ListObjectsV2Async(new ListObjectsV2Request()
        {
            BucketName = bucketName,
            ContinuationToken = s3ObjectsResponse?.NextContinuationToken,
            StartAfter = s3ObjectsResponse?.StartAfter
        });

        Console.WriteLine("Total Objects retrieved {0}", s3ObjectsResponse.S3Objects.Count);
        foreach (var s3Object in s3ObjectsResponse.S3Objects)
            yield return s3Object;
    } while (s3ObjectsResponse.IsTruncated);
}

The function no longer builds up the internal list of objects but instead yield returns the items one by one from the batch response.

S3 SDK, by default, returns a max of 1000 items per request.

So when the collection is first enumerated, it makes the call, gets the first 1000 objects, loops through them, and yield returns them one by one until the 1000 items are consumed.

If the consumer needs more items, it will call to get the next batch of 1000 and do the same.

This keeps going until the consumer is done needing any more items or the S3 bucket runs out of items.

Consuming an IAsyncEnumerable Collection

With the function GetAllFileS3Objects now returning an IAsyncEnumerable, we no longer need to await that call.

However, since enumerating the collection itself has to be an async operation, we can now move that to the foreach statement.

async Task<List<ProductRating>> GetAllRatings(AmazonS3Client s3Client)
{
    var s3Objects = GetAllFileS3Objects(s3Client);
    ...
    await foreach (var s3Object in s3Objects)
    {
       ...
    }
    
    return productRatings;
}

Consuming an asynchronous stream of data using await foreach statement in C#.

You can use await foreach to consume an asynchronous stream of data.

This code still builds up the entire rating object before applying the filtering condition to the Program.cs main function.

That's easily solved by moving down the condition and the filtering below into the GetAllRatings method and applying it there.

async Task<List<ProductRating>> GetAllRatings(AmazonS3Client s3Client)
{
    var s3Objects = GetAllFileS3Objects(s3Client);
    ...
    
    await foreach (var s3Object in s3Objects)
    {
        var rating = await GetRating(s3Client, s3Object);
        ...
        if(rating.Rating == 3)
            filteredProductRatings.Add(rating);
            
        if (filteredProductRatings.Count == 50)
            return filteredProductRatings;
    }
    
    return filteredProductRatings;
}

Refactored code to read only the minimum required items from the S3 object to match our required criteria.

The above code now builds up a filteredProductRatings list based on our required conditions. As soon as the required number of objects is met, it exits the foreach loop and returns the ratings required.

In a future post, we will learn how to refactor this further using standard LINQ extension methods like Select, Where, etc.

But before that, look over the code base you are working on now and see if there are any places where you should switch to Async Enumerable. 👋

DotnetAWS