Skip to content

SQS Batch Processing (Deprecated)

Warning

The SQS batch module is now deprecated and will be removed in v2 of the library. Use the batch module, and check out migrating to the batch library for migration instructions.

The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS. The utility handles batch processing for both standard and FIFO SQS queues.

Key Features

  • Prevent successfully processed messages from being returned to SQS
  • A simple interface for individually processing messages from a batch

Background

When using SQS as a Lambda event source mapping, Lambda functions can be triggered with a batch of messages from SQS. If your function fails to process any message from the batch, the entire batch returns to your SQS queue, and your Lambda function will be triggered with the same batch again. With this utility, messages within a batch will be handled individually - only messages that were not successfully processed are returned to the queue.

Warning

While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible. More details on how Lambda works with SQS can be found in the AWS documentation

Install

Depending on your version of Java (either Java 1.8 or 11+), the configuration slightly changes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<dependencies>
    ...
    <dependency>
        <groupId>software.amazon.lambda</groupId>
        <artifactId>powertools-sqs</artifactId>
        <version>1.18.0</version>
    </dependency>
    ...
</dependencies>
...
<!-- configure the aspectj-maven-plugin to compile-time weave (CTW) the aws-lambda-powertools-java aspects into your project -->
<build>
    <plugins>
        ...
        <plugin>
             <groupId>dev.aspectj</groupId>
             <artifactId>aspectj-maven-plugin</artifactId>
             <version>1.13.1</version>
             <configuration>
                 <source>11</source> <!-- or higher -->
                 <target>11</target> <!-- or higher -->
                 <complianceLevel>11</complianceLevel> <!-- or higher -->
                 <aspectLibraries>
                     <aspectLibrary>
                         <groupId>software.amazon.lambda</groupId>
                         <artifactId>powertools-sqs</artifactId>
                     </aspectLibrary>
                 </aspectLibraries>
             </configuration>
             <executions>
                 <execution>
                     <goals>
                         <goal>compile</goal>
                     </goals>
                 </execution>
             </executions>
        </plugin>
        ...
    </plugins>
</build>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<dependencies>
    ...
    <dependency>
        <groupId>software.amazon.lambda</groupId>
        <artifactId>powertools-sqs</artifactId>
        <version>1.18.0</version>
    </dependency>
    ...
</dependencies>
...
<!-- configure the aspectj-maven-plugin to compile-time weave (CTW) the aws-lambda-powertools-java aspects into your project -->
<build>
    <plugins>
        ...
        <plugin>
             <groupId>org.codehaus.mojo</groupId>
             <artifactId>aspectj-maven-plugin</artifactId>
             <version>1.14.0</version>
             <configuration>
                 <source>1.8</source>
                 <target>1.8</target>
                 <complianceLevel>1.8</complianceLevel>
                 <aspectLibraries>
                     <aspectLibrary>
                         <groupId>software.amazon.lambda</groupId>
                         <artifactId>powertools-sqs</artifactId>
                     </aspectLibrary>
                 </aspectLibraries>
             </configuration>
             <executions>
                 <execution>
                     <goals>
                         <goal>compile</goal>
                     </goals>
                 </execution>
             </executions>
        </plugin>
        ...
    </plugins>
</build>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    plugins {
        id 'java'
        id 'io.freefair.aspectj.post-compile-weaving' version '8.1.0'
    }

    repositories {
        mavenCentral()
    }

    dependencies {
        aspect 'software.amazon.lambda:powertools-sqs:1.18.0'
    }

    sourceCompatibility = 11 // or higher
    targetCompatibility = 11 // or higher
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    plugins {
        id 'java'
        id 'io.freefair.aspectj.post-compile-weaving' version '6.6.3'
    }

    repositories {
        mavenCentral()
    }

    dependencies {
        aspect 'software.amazon.lambda:powertools-sqs:1.18.0'
    }

    sourceCompatibility = 1.8
    targetCompatibility = 1.8

IAM Permissions

This utility requires additional permissions to work as expected. Lambda functions using this utility require the sqs:DeleteMessageBatch permission.

If you are also using nonRetryableExceptions attribute, utility will need additional permission of sqs:GetQueueAttributes on source SQS. It also needs sqs:SendMessage and sqs:SendMessageBatch on configured dead letter queue.

If source or dead letter queue is configured to use encryption at rest using AWS Key Management Service (KMS), function will need additional permissions of kms:GenerateDataKey and kms:Decrypt on the KMS key being used for encryption. Refer docs for more details.

Refer example project for policy details example.

Processing messages from SQS

You can use either SqsBatch annotation, or SqsUtils Utility API as a fluent API.

Both have nearly the same behaviour when it comes to processing messages from the batch:

  • Entire batch has been successfully processed, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
  • Entire Batch has been partially processed successfully, where exceptions were raised within your SqsMessageHandler interface implementation, we will:
    • 1) Delete successfully processed messages from the queue by directly calling sqs:DeleteMessageBatch
    • 2) If a message with a message group ID fails, the processing of the batch will be stopped and the remainder of the messages will be returned to SQS. This behaviour is required to handle SQS FIFO queues.
    • 3) if non retryable exceptions occur, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if deleteNonRetryableMessageFromQueue is set to true.
    • 4) Raise SQSBatchProcessingException to ensure failed messages return to your SQS queue

The only difference is that SqsUtils Utility API will give you access to return from the processed messages if you need. Exception SQSBatchProcessingException thrown from the utility will have access to both successful and failed messaged along with failure exceptions.

Functional Interface SqsMessageHandler

Both annotation and SqsUtils Utility API requires an implementation of functional interface SqsMessageHandler.

This implementation is responsible for processing each individual message from the batch, and to raise an exception if unable to process any of the messages sent.

Any non-exception/successful return from your record handler function will instruct utility to queue up each individual message for deletion.

SqsBatch annotation

When using this annotation, you need provide a class implementation of SqsMessageHandler that will process individual messages from the batch - It should raise an exception if it is unable to process the record.

All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:

  • Any successfully processed messages, we will delete them from the queue via sqs:DeleteMessageBatch.
  • if, nonRetryableExceptions attribute is used, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if deleteNonRetryableMessageFromQueue is set to true.
  • Any unprocessed messages detected, we will raise SQSBatchProcessingException to ensure failed messages return to your SQS queue.

Warning

You will not have access to the processed messages within the Lambda Handler - all processing logic will and should be performed by the implemented SqsMessageHandler#process() function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import software.amazon.lambda.powertools.sqs.SqsBatch;
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
import software.amazon.lambda.powertools.sqs.SqsUtils;

public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
    @Override
    @SqsBatch(SampleMessageHandler.class)
    public String handleRequest(SQSEvent input, Context context) {
        return "{\"statusCode\": 200}";
    }

    public class SampleMessageHandler implements SqsMessageHandler<Object> {

        @Override
        public String process(SQSMessage message) {
            // This will be called for each individual message from a batch
            // It should raise an exception if the message was not processed successfully
            String returnVal = doSomething(message.getBody());
            return returnVal;
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import software.amazon.lambda.powertools.sqs.SqsBatch;
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
import software.amazon.lambda.powertools.sqs.SqsUtils;

public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
    @Override
    @SqsBatch(value = SampleMessageHandler.class, nonRetryableExceptions = {IllegalArgumentException.class})
    public String handleRequest(SQSEvent input, Context context) {
        return "{\"statusCode\": 200}";
    }

    public class SampleMessageHandler implements SqsMessageHandler<Object> {

        @Override
        public String process(SQSMessage message) {
            // This will be called for each individual message from a batch
            // It should raise an exception if the message was not processed successfully
            String returnVal = doSomething(message.getBody());

            if(/**Business validation failure**/) {
                throw new IllegalArgumentException("Failed business validation. No point of retrying. Move me to DLQ." + message.getMessageId());
            }

            return returnVal;
        }
    }
}

SqsUtils Utility API

If you require access to the result of processed messages, you can use this utility. The result from calling SqsUtils#batchProcessor() on the context manager will be a list of all the return values from your SqsMessageHandler#process() function.

You can also use the utility in functional way by providing inline implementation of functional interface SqsMessageHandler#process()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public class AppSqsEvent implements RequestHandler<SQSEvent, List<String>> {
    @Override
    public List<String> handleRequest(SQSEvent input, Context context) {
        List<String> returnValues = SqsUtils.batchProcessor(input, SampleMessageHandler.class);

        return returnValues;
    }

    public class SampleMessageHandler implements SqsMessageHandler<String> {

        @Override
        public String process(SQSMessage message) {
            // This will be called for each individual message from a batch
            // It should raise an exception if the message was not processed successfully
            String returnVal = doSomething(message.getBody());
            return returnVal;
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class AppSqsEvent implements RequestHandler<SQSEvent, List<String>> {

    @Override
    public List<String> handleRequest(SQSEvent input, Context context) {
        List<String> returnValues = SqsUtils.batchProcessor(input, (message) -> {
            // This will be called for each individual message from a batch
            // It should raise an exception if the message was not processed successfully
            String returnVal = doSomething(message.getBody());
            return returnVal;
        });

        return returnValues;
    }
}

Passing custom SqsClient

If you need to pass custom SqsClient such as region to the SDK, you can pass your own SqsClient to be used by utility either for SqsBatch annotation, or SqsUtils Utility API.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class AppSqsEvent implements RequestHandler<SQSEvent, List<String>> {
    static {
        SqsUtils.overrideSqsClient(SqsClient.builder()
                .build());
    }

    @Override
    public List<String> handleRequest(SQSEvent input, Context context) {
        List<String> returnValues = SqsUtils.batchProcessor(input, SampleMessageHandler.class);

        return returnValues;
    }

    public class SampleMessageHandler implements SqsMessageHandler<String> {

        @Override
        public String process(SQSMessage message) {
            // This will be called for each individual message from a batch
            // It should raise an exception if the message was not processed successfully
            String returnVal = doSomething(message.getBody());
            return returnVal;
        }
    }
}

Suppressing exceptions

If you want to disable the default behavior where SQSBatchProcessingException is raised if there are any exception, you can pass the suppressException boolean argument.

1
2
3
4
5
    @Override
    @SqsBatch(value = SampleMessageHandler.class, suppressException = true)
    public String handleRequest(SQSEvent input, Context context) {
        return "{\"statusCode\": 200}";
    }
1
2
3
4
5
6
    @Override
    public List<String> handleRequest(SQSEvent input, Context context) {
        List<String> returnValues = SqsUtils.batchProcessor(input, true, SampleMessageHandler.class);

        return returnValues;
    }

Move non retryable messages to a dead letter queue

If you want certain exceptions to be treated as permanent failures during batch processing, i.e. exceptions where the result of retrying will always be a failure and want these can be immediately moved to the dead letter queue associated to the source SQS queue, you can use SqsBatch#nonRetryableExceptions() to configure such exceptions.

If you want such messages to be deleted instead, set SqsBatch#deleteNonRetryableMessageFromQueue() to true. By default, its value is false.

Same capability is also provided by SqsUtils Utility API.

Info

Make sure the lambda function has required permissions needed by utility. Refer this section.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import software.amazon.lambda.powertools.sqs.SqsBatch;
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
import software.amazon.lambda.powertools.sqs.SqsUtils;

public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
    @Override
    @SqsBatch(value = SampleMessageHandler.class, nonRetryableExceptions = {IllegalArgumentException.class})
    public String handleRequest(SQSEvent input, Context context) {
        return "{\"statusCode\": 200}";
    }

    public class SampleMessageHandler implements SqsMessageHandler<Object> {

        @Override
        public String process(SQSMessage message) {
            // This will be called for each individual message from a batch
            // It should raise an exception if the message was not processed successfully
            String returnVal = doSomething(message.getBody());

            if(/**Business validation failure**/) {
                throw new IllegalArgumentException("Failed business validation. No point of retrying. Move me to DLQ." + message.getMessageId());
            }

            return returnVal;
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import software.amazon.lambda.powertools.sqs.SqsBatch;
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
import software.amazon.lambda.powertools.sqs.SqsUtils;

public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
    @Override
    public String handleRequest(SQSEvent input, Context context) {

        SqsUtils.batchProcessor(input, BatchProcessor.class, IllegalArgumentException.class);

        return "{\"statusCode\": 200}";
    }

    public class SampleMessageHandler implements SqsMessageHandler<Object> {

        @Override
        public String process(SQSMessage message) {
            // This will be called for each individual message from a batch
            // It should raise an exception if the message was not processed successfully
            String returnVal = doSomething(message.getBody());

            if(/**Business validation failure**/) {
                throw new IllegalArgumentException("Failed business validation. No point of retrying. Move me to DLQ." + message.getMessageId());
            }

            return returnVal;
        }
    }
}

Migrating to the Batch Library

The batch processing library provides a way to process messages and gracefully handle partial failures for SQS, Kinesis Streams, and DynamoDB Streams batch sources. In comparison the legacy SQS Batch library, it relies on Lambda partial batch responses, which allows the library to provide a simpler, reliable interface for processing batches.

In order to get started, check out the processing messages from SQS documentation. In most cases, you will simply be able to retain your existing batch message handler function, and wrap it with the new batch processing interface. Unlike this module, As the batch processor uses partial batch responses to communicate to Lambda which messages have been processed and must be removed from the queue, the return of the handler's process function must be returned to Lambda.

The new library also no longer requires the SQS:DeleteMessage action on the Lambda function's role policy, as Lambda itself now manages removal of messages from the queue.

Info

Some tuneables from this library are no longer provided.

  • Non-retryable Exceptions - there is no mechanism to indicate in a partial batch response that a particular message should not be retried and instead moved to DLQ - a message either succeeds, or fails and is retried. A message will be moved to the DLQ once the normal retry process has expired.
  • Suppress Exception - The new batch processor does not throw an exception on failure of a handler. Instead, its result must be returned by your code from your message handler to Lambda, so that Lambda can manage the completed messages and retry behaviour.