SQS Batch Processing
The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS.
Key Features¶
- Prevent successfully processed messages being returned to SQS
- Simple interface for individually processing messages from a batch
- Build your own batch processor using the base classes
Background¶
When using SQS as a Lambda event source mapping, Lambda functions are triggered with a batch of messages from SQS.
If your function fails to process any message from the batch, the entire batch returns to your SQS queue, and your Lambda function is triggered with the same batch one more time.
With this utility, messages within a batch are handled individually - only messages that were not successfully processed are returned to the queue.
Warning
While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible.
More details on how Lambda works with SQS can be found in the AWS documentation
Getting started¶
IAM Permissions¶
Before your use this utility, your AWS Lambda function must have sqs:DeleteMessageBatch
permission to delete successful messages directly from the queue.
Example using AWS Serverless Application Model (SAM)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
Processing messages from SQS¶
You can use either sqs_batch_processor
decorator, or PartialSQSProcessor
as a context manager if you'd like access to the processed results.
You need to create a function to handle each record from the batch - We call it record_handler
from here on.
1 2 3 4 5 6 7 8 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Tip
Any non-exception/successful return from your record handler function will instruct both decorator and context manager to queue up each individual message for deletion.
If the entire batch succeeds, we let Lambda to proceed in deleting the records from the queue for cost reasons.
Partial failure mechanics¶
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
- Any successfully processed messages, we will delete them from the queue via
sqs:DeleteMessageBatch
- Any unprocessed messages detected, we will raise
SQSBatchProcessingError
to ensure failed messages return to your SQS queue
Warning
You will not have accessed to the processed messages within the Lambda Handler.
All processing logic will and should be performed by the record_handler
function.
Advanced¶
Choosing between decorator and context manager¶
They have nearly the same behaviour when it comes to processing messages from the batch:
- Entire batch has been successfully processed, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
- Entire Batch has been partially processed successfully, where exceptions were raised within your
record handler
, we will:- 1) Delete successfully processed messages from the queue by directly calling
sqs:DeleteMessageBatch
- 2) Raise
SQSBatchProcessingError
to ensure failed messages return to your SQS queue
- 1) Delete successfully processed messages from the queue by directly calling
The only difference is that PartialSQSProcessor will give you access to processed messages if you need.
Accessing processed messages¶
Use PartialSQSProcessor
context manager to access a list of all return values from your record_handler
function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Passing custom boto3 config¶
If you need to pass custom configuration such as region to the SDK, you can pass your own botocore config object to
the sqs_batch_processor
decorator:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
Suppressing exceptions¶
If you want to disable the default behavior where SQSBatchProcessingError
is raised if there are any errors, you can pass the suppress_exception
boolean argument.
1 2 3 4 5 |
|
1 2 3 4 5 6 |
|
Create your own partial processor¶
You can create your own partial batch processor by inheriting the BasePartialProcessor
class, and implementing _prepare()
, _clean()
and _process_record()
.
_process_record()
- Handles all processing logic for each individual message of a batch, including calling therecord_handler
(self.handler)_prepare()
- Called once as part of the processor initializationclean()
- Teardown logic called once after_process_record
completes
You can then use this class as a context manager, or pass it to batch_processor
to use as a decorator on your Lambda handler function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
|
Integrating exception handling with Sentry.io¶
When using Sentry.io for error monitoring, you can override failure_handler
to include to capture each processing exception:
Credits to Charles-Axel Dein
1 2 3 4 5 6 7 8 9 10 |
|