Efficiently Handle SQS Messages with AWS Lambda Powertools Batch Utility

The AWS Lambda PowerTools Batch processing utility makes it easy to process a batch of messages from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. Let's learn how to get started.

Rahul Pulikkot Nath
Rahul Pulikkot Nath

Table of Contents

The AWS Lambda PowerTools Batch processing utility makes it easy to process a batch of messages from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.

It provides a simple interface to process each batch record, enables parallel processing, report batch item failures to reduce number of retries and also supports customizations to meet your business needs and functionality.

In this post, let’s learn how to use the Lambda Powertools Batch processing utility to process messages in batches from Amazon SQS.

This article is sponsored by AWS and is part of my AWS Series.

Native Batch Processing With AWS Lambda

Before we get into the Powertools Batch Utility Library, let’s take a look at how you would otherwise handle messages in batch from an AWS Lambda Function.

Amazon SQS and AWS Lambda Triggers in .NET
Learn how to process SQS messages from AWS Lambda Function. We will learn how to set up and trigger a .NET Lambda Function using SQS and the various configurations associated.

The below Lambda Function handles an SQSEvent and processes each of the records contained in it.

[LambdaFunction(Role = "@NotifyCustomersLambdaExecutionRole")]
public async Task<SQSBatchResponse> NotifyCustomers(
       SQSEvent evnt, ILambdaContext context)
{
    var response = new SQSBatchResponse()
    {
        BatchItemFailures = new List<SQSBatchResponse.BatchItemFailure>()
    };

    foreach (var message in evnt.Records)
    {
        try
        {
            await ProcessMessageAsync(message, context);
        }
        catch (Exception e)
        {

            context.Logger.LogError(e.Message);
            response.BatchItemFailures.Add(new SQSBatchResponse.BatchItemFailure()
            {
                ItemIdentifier = message.MessageId
            });
        }
    }

    return response;
}

private async Task ProcessMessageAsync(
       SQSEvent.SQSMessage message, ILambdaContext context)
{
    await Task.Delay(1000);
    if (message.Body.Contains("Exception"))
        throw new Exception($"Invalid Message - {message.Body}");

    Console.WriteLine($"Processed Message - {message.Body}");
}

It builds up a list of BatchItemFailures that contains the id of messages that failed to process.

Once all messages are processed, it returns the SQSBatchResponse with the list of BatchItemFailures that contains the message id's that failed to process.

How to Handle Exceptions When Processing SQS Messages in .NET Lambda Function
Learn how to handle exceptions, configure Dead Letter Queue, re-drive messages from Amazon SQS and how to enable batch error processing from a .NET Lambda Function.

By default, SQS will retry all the messages in a batch even if only a subset of the messages fails to process.

To allow the Lambda Function to return a partial successful response as above, we need to turn on 'Report Batch Item Failures' when setting up the Lambda Trigger on SQS.

Once turned on, SQS removes the messages that are processed successfully from the queue and retries the ones that failed to process.

As you can see in the Function Handler code above, we need to handle all the plumbing code required to manage the processing errors and returning the message id's from the Function.

Let's see how the Batch Utility improves this experience of building Lambda Functions to handle SQS messages.

Powertools Batch Utility and SQS Events

To start using the Powertools Batch utility from our application, let's first add a NuGet package - AWS.Lambda.Powertools.BatchProcessing

You can decorate the Function Handler with the BatchProcessorAttribute to start using the Batch utility, as shown below.

[LambdaFunction(Role = "@NotifyCustomersLambdaExecutionRole")]
[BatchProcessor(RecordHandler = typeof(NotifyCustomerRecordHandler))]
public BatchItemFailuresResponse HandleNotifyCustomersBatch(
     SQSEvent evnt, ILambdaContext context)
{
    return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}

public class NotifyCustomerRecordHandler : ISqsRecordHandler
{
    public async Task<RecordHandlerResult> HandleAsync(
        SQSEvent.SQSMessage message, CancellationToken cancellationToken)
    {
        await Task.Delay(1000);
        if (message.Body.Contains("Exception"))
            throw new Exception($"Invalid Message - {message.Body}");

        Console.WriteLine($"Processed Message - {message.Body}");

        return await Task.FromResult(RecordHandlerResult.None);
    }
}

The BatchProcessor takes in a property RecordHandler which specifies the type that will handle the processing of the individual record messages.

In this case, since we are handling SQSRecords, let's create a new class NotifyCustomerRecordHandler that implements the ISqsRecordHandler (which comes from the NuGet package).

The HandleAsync method takes in a single SQSMessage record item and contains the processing logic for the message.

The main handler function uses the SqsBatchProcessor helper class and returns the BatchItemFailureResponse. This is the list of message IDs that failed to process.

The BatchProcessor attribute hides all the plumbing logic required to handle exceptions and build the list of failed messages.

Enabling Parallel Processing of Messages from SQS

The Batch Processor, by default, processes the messages sequentially, one after the other.

However, you can change this behavior if the message records within an SQS event can be processed in parallel.

[BatchProcessor(
  RecordHandler = typeof(NotifyCustomerRecordHandler), 
  BatchParallelProcessingEnabled = true, 
  MaxDegreeOfParallelism = -1)]

To configure parallel message processing, specify the BatchParallelProcessingEnabled to true and also specify the MaxDegreeOfParallelism properties on the Batch Processor attribute, as shown above.

MaxDegreeOfParallelism takes three possible values

  • 1 (default) → sequential processing
  • value > 1 → enables parallel processing
  • -1 → parallelism configured to vCPU count of the Lambda function

The Batch Utility also supports integrating with Amazon Kinesis and DynamoDB Streams. The only difference from the SQS is its processor type and interface for individual message handling. I'll leave that exercise for you to explore.

Have a great day!

AWS