Efficiently Handle SQS Messages with AWS Lambda Powertools Batch Utility
The AWS Lambda PowerTools Batch processing utility makes it easy to process a batch of messages from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. Let's learn how to get started.
Table of Contents
The AWS Lambda PowerTools Batch processing utility makes it easy to process a batch of messages from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
It provides a simple interface to process each batch record, enables parallel processing, report batch item failures to reduce number of retries and also supports customizations to meet your business needs and functionality.
In this post, let’s learn how to use the Lambda Powertools Batch processing utility to process messages in batches from Amazon SQS.
This article is sponsored by AWS and is part of my AWS Series.
Native Batch Processing With AWS Lambda
Before we get into the Powertools Batch Utility Library, let’s take a look at how you would otherwise handle messages in batch from an AWS Lambda Function.
The below Lambda Function handles an SQSEvent
and processes each of the records contained in it.
[LambdaFunction(Role = "@NotifyCustomersLambdaExecutionRole")]
public async Task<SQSBatchResponse> NotifyCustomers(
SQSEvent evnt, ILambdaContext context)
{
var response = new SQSBatchResponse()
{
BatchItemFailures = new List<SQSBatchResponse.BatchItemFailure>()
};
foreach (var message in evnt.Records)
{
try
{
await ProcessMessageAsync(message, context);
}
catch (Exception e)
{
context.Logger.LogError(e.Message);
response.BatchItemFailures.Add(new SQSBatchResponse.BatchItemFailure()
{
ItemIdentifier = message.MessageId
});
}
}
return response;
}
private async Task ProcessMessageAsync(
SQSEvent.SQSMessage message, ILambdaContext context)
{
await Task.Delay(1000);
if (message.Body.Contains("Exception"))
throw new Exception($"Invalid Message - {message.Body}");
Console.WriteLine($"Processed Message - {message.Body}");
}
It builds up a list of BatchItemFailures
that contains the id of messages that failed to process.
Once all messages are processed, it returns the SQSBatchResponse
with the list of BatchItemFailures
that contains the message id's that failed to process.
By default, SQS will retry all the messages in a batch even if only a subset of the messages fails to process.
To allow the Lambda Function to return a partial successful response as above, we need to turn on 'Report Batch Item Failures' when setting up the Lambda Trigger on SQS.
Once turned on, SQS removes the messages that are processed successfully from the queue and retries the ones that failed to process.
As you can see in the Function Handler code above, we need to handle all the plumbing code required to manage the processing errors and returning the message id's from the Function.
Let's see how the Batch Utility improves this experience of building Lambda Functions to handle SQS messages.
Powertools Batch Utility and SQS Events
To start using the Powertools Batch utility from our application, let's first add a NuGet package - AWS.Lambda.Powertools.BatchProcessing
You can decorate the Function Handler with the BatchProcessorAttribute
to start using the Batch utility, as shown below.
[LambdaFunction(Role = "@NotifyCustomersLambdaExecutionRole")]
[BatchProcessor(RecordHandler = typeof(NotifyCustomerRecordHandler))]
public BatchItemFailuresResponse HandleNotifyCustomersBatch(
SQSEvent evnt, ILambdaContext context)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
public class NotifyCustomerRecordHandler : ISqsRecordHandler
{
public async Task<RecordHandlerResult> HandleAsync(
SQSEvent.SQSMessage message, CancellationToken cancellationToken)
{
await Task.Delay(1000);
if (message.Body.Contains("Exception"))
throw new Exception($"Invalid Message - {message.Body}");
Console.WriteLine($"Processed Message - {message.Body}");
return await Task.FromResult(RecordHandlerResult.None);
}
}
The BatchProcessor takes in a property RecordHandler
which specifies the type that will handle the processing of the individual record messages.
In this case, since we are handling SQSRecords, let's create a new class NotifyCustomerRecordHandler
that implements the ISqsRecordHandler
(which comes from the NuGet package).
The HandleAsync
method takes in a single SQSMessage
record item and contains the processing logic for the message.
The main handler function uses the SqsBatchProcessor
helper class and returns the BatchItemFailureResponse
. This is the list of message IDs that failed to process.
The BatchProcessor
attribute hides all the plumbing logic required to handle exceptions and build the list of failed messages.
Enabling Parallel Processing of Messages from SQS
The Batch Processor, by default, processes the messages sequentially, one after the other.
However, you can change this behavior if the message records within an SQS event can be processed in parallel.
[BatchProcessor(
RecordHandler = typeof(NotifyCustomerRecordHandler),
BatchParallelProcessingEnabled = true,
MaxDegreeOfParallelism = -1)]
To configure parallel message processing, specify the BatchParallelProcessingEnabled
to true and also specify the MaxDegreeOfParallelism
properties on the Batch Processor attribute, as shown above.
MaxDegreeOfParallelism takes three possible values
- 1 (default) → sequential processing
- value > 1 → enables parallel processing
- -1 → parallelism configured to vCPU count of the Lambda function
The Batch Utility also supports integrating with Amazon Kinesis and DynamoDB Streams. The only difference from the SQS is its processor type and interface for individual message handling. I'll leave that exercise for you to explore.
Have a great day!
Rahul Nath Newsletter
Join the newsletter to receive the latest updates in your inbox.