Batch Processing
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
stateDiagram-v2
direction LR
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
LambdaInit: Lambda invocation
BatchProcessor: Batch Processor
RecordHandler: Record Handler function
YourLogic: Your logic to process each batch item
LambdaResponse: Lambda response
BatchSource --> LambdaInit
LambdaInit --> BatchProcessor
BatchProcessor --> RecordHandler
state BatchProcessor {
[*] --> RecordHandler: Your function
RecordHandler --> YourLogic
}
RecordHandler --> BatchProcessor: Collect results
BatchProcessor --> LambdaResponse: Report items that failed processing
Key features¶
- Reports batch item failures to reduce number of retries for a record upon errors
- Simple interface to process each batch record
- Build your own batch processor by extending primitives
Background¶
When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: a) your Lambda function returns a successful response, b) record reaches maximum retry attempts, or c) when records expire.
journey
section Conditions
Successful response: 5: Success
Maximum retries: 3: Failure
Records expired: 1: Failure
This behavior changes when you enable ReportBatchItemFailures feature in your Lambda function event source configuration:
- SQS queues. Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
- Kinesis data streams and DynamoDB streams. Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
Warning: This utility lowers the chance of processing records more than once; it does not guarantee it
We recommend implementing processing logic in an idempotent manner wherever possible.
You can find more details on how Lambda works with either SQS, Kinesis, or DynamoDB in the AWS Documentation.
Getting started¶
Installation¶
Install the library in your project
1 |
|
For this feature to work, you need to (1) configure your Lambda function event source to use ReportBatchItemFailures
, and (2) return a specific response to report which records failed to be processed.
Use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.
Required resources¶
The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries.
You do not need any additional IAM permissions to use this utility, except for what each event source requires.
template.yaml | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
|
template.yaml | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
|
template.yaml | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
|
Processing messages from SQS¶
Processing batches from SQS works in three stages:
- Instantiate
BatchProcessor
and chooseEventType.SQS
for the event type - Define your function to handle each batch record, and use the
SQSRecord
type annotation for autocompletion - Use
processPartialResponse
to kick off processing
Info
This code example optionally uses Logger for completion.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
- Step 1. Creates a partial failure batch processor for SQS queues. See partial failure mechanics for details
- Step 2. Defines a function to receive one record at a time from the batch
- Step 3. Kicks off processing
The second record failed to be processed, therefore the processor added its message ID in the response.
1 2 3 4 5 6 7 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
|
FIFO queues¶
When using SQS FIFO queues, a batch may include messages from different group IDs.
By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.
Enable the skipGroupOnError
option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|
- Step 1. Creates a partial failure batch processor for SQS FIFO queues. See partial failure mechanics for details
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
Note
Note that SqsFifoPartialProcessor
is synchronous using processPartialResponseSync
.
If you need asynchronous processing while preserving the order of messages in the queue, use SqsFifoPartialProcessorAsync
with processPartialResponse
.
Processing messages from Kinesis¶
Processing batches from Kinesis works in three stages:
- Instantiate
BatchProcessor
and chooseEventType.KinesisDataStreams
for the event type - Define your function to handle each batch record, and use the
KinesisStreamRecord
type annotation for autocompletion - Use
processPartialResponse
to kick off processing
Info
This code example optionally uses Logger for completion.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
- Creates a partial failure batch processor for Kinesis Data Streams. See partial failure mechanics for details
The second record failed to be processed, therefore the processor added its sequence number in the response.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
|
1 2 3 4 5 6 7 |
|
Processing messages from DynamoDB¶
Processing batches from DynamoDB Streams works in three stages:
- Instantiate
BatchProcessor
and chooseEventType.DynamoDBStreams
for the event type - Define your function to handle each batch record, and use the
DynamoDBRecord
type annotation for autocompletion - Use
processPartialResponse
to kick off processing
Info
This code example optionally uses Logger for completion.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
|
- Creates a partial failure batch processor for DynamoDB Streams. See partial failure mechanics for details
The second record failed to be processed, therefore the processor added its sequence number in the response.
1 2 3 4 5 6 7 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
|
Error handling¶
By default, we catch any exception raised by your record handler function. This allows us to (1) continue processing the batch, (2) collect each batch item that failed processing, and (3) return the appropriate response correctly without failing your Lambda function execution.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
-
Any exception works here. See extending BatchProcessorSync section, if you want to override this behavior.
-
Exceptions raised in
recordHandler
will propagate toprocess_partial_response
.
We catch them and include each failed batch item identifier in the response dictionary (seeSample response
tab).
1 2 3 4 5 6 7 |
|
Partial failure mechanics¶
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
- All records successfully processed. We will return an empty list of item failures
{'batchItemFailures': []}
- Partial success with some exceptions. We will return a list of all item IDs/sequence numbers that failed processing
- All records failed to be processed. We will throw a
FullBatchFailureError
error with a list of all the errors thrown while processing unlessthrowOnFullBatchFailure
is disabled.
The following sequence diagrams explain how each Batch processor behaves under different scenarios.
SQS Standard¶
Read more about Batch Failure Reporting feature in AWS Lambda.
Sequence diagram to explain how BatchProcessor
works with SQS Standard queues.
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
Lambda function->>Lambda service: Report some failed messages
activate SQS queue
Lambda service->>SQS queue: Delete successful messages
SQS queue-->>SQS queue: Failed messages return
Note over SQS queue,Lambda service: Process repeat
deactivate SQS queue
SQS mechanism with Batch Item Failures
SQS FIFO¶
Read more about Batch Failure Reporting feature in AWS Lambda.
Sequence diagram to explain how SqsFifoPartialProcessor
works with SQS FIFO queues without skipGroupOnError
flag.
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure
deactivate Lambda function
activate SQS queue
Lambda service->>SQS queue: Delete successful messages (1-2)
SQS queue-->>SQS queue: Failed messages return (3-10)
deactivate SQS queue
SQS FIFO mechanism with Batch Item Failures
Sequence diagram to explain how SqsFifoPartialProcessor
works with SQS FIFO queues with skipGroupOnError
flag.
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function-->Lambda function: Process messages from another MessageGroupID
Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
deactivate Lambda function
activate SQS queue
Lambda service->>SQS queue: Delete successful messages processed
SQS queue-->>SQS queue: Failed messages return
deactivate SQS queue
SQS FIFO mechanism with Batch Item Failures
Kinesis and DynamoDB Streams¶
Read more about Batch Failure Reporting feature.
Sequence diagram to explain how BatchProcessor
works with both Kinesis Data Streams and DynamoDB Streams.
For brevity, we will use Streams
to refer to either services. For theory on stream checkpoints, see this blog post
sequenceDiagram
autonumber
participant Streams
participant Lambda service
participant Lambda function
Lambda service->>Streams: Poll latest records
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function-->Lambda function: Continue processing batch items (4-10)
Lambda function->>Lambda service: Report batch item as failure (3)
deactivate Lambda function
activate Streams
Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item
Lambda service->>Streams: Poll records starting from updated checkpoint
deactivate Streams
Kinesis and DynamoDB streams mechanism with single batch item failure
The behavior changes slightly when there are multiple item failures. Stream checkpoint is updated to the lowest sequence number reported.
Note that the batch item sequence number could be different from batch item number in the illustration.
sequenceDiagram
autonumber
participant Streams
participant Lambda service
participant Lambda function
Lambda service->>Streams: Poll latest records
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3-5 batch items
Lambda function-->Lambda function: Continue processing batch items (6-10)
Lambda function->>Lambda service: Report batch items as failure (3-5)
deactivate Lambda function
activate Streams
Lambda service->>Streams: Checkpoints to lowest sequence number
Lambda service->>Streams: Poll records starting from updated checkpoint
deactivate Streams
Kinesis and DynamoDB streams mechanism with multiple batch item failures
Async or sync processing¶
There are two processors you can use with this utility:
BatchProcessor
andprocessPartialResponse
– Processes messages asynchronouslyBatchProcessorSync
andprocessPartialResponseSync
– Processes messages synchronously
In most cases your function will be async
returning a Promise
. Therefore, the BatchProcessor
is the default processor handling your batch records asynchronously.
There are use cases where you need to process the batch records synchronously. For example, when you need to process multiple records at the same time without conflicting with one another.
For such cases we recommend to use the BatchProcessorSync
and processPartialResponseSync
functions.
Note that you need match your processing function with the right batch processor
*If your function is async
returning a Promise
, use BatchProcessor
and processPartialResponse
* If your function is not async
, use BatchProcessorSync
and processPartialResponseSync
The difference between the two processors is in how they handle record processing:
BatchProcessor
: By default, it processes records in parallel usingPromise.all()
. However, it also offers an option to process records sequentially, preserving the order.BatchProcessorSync
: Always processes records sequentially, ensuring the order is preserved by looping through each record one by one.
When is this useful?
For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.
The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).
Advanced¶
Accessing processed messages¶
Use the BatchProcessor
directly in your function to access a list of all returned values from your recordHandler
function.
- When successful. We will include a tuple with
success
, the result ofrecordHandler
, and the batch record - When failed. We will include a tuple with
fail
, exception as a string, and the batch record
Accessing processed messages | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
- The processor requires the records array. This is typically handled by
processPartialResponse
. - You need to register the
batch
, therecordHandler
function, and optionally thecontext
to access the Lambda context.
Accessing Lambda Context¶
Within your recordHandler
function, you might need access to the Lambda context to determine how much time you have left before your function times out.
We can automatically inject the Lambda context into your recordHandler
as optional second argument if you register it when using BatchProcessorSync
or the processPartialResponseSync
function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
|
Working with full batch failures¶
By default, the BatchProcessor
will throw a FullBatchFailureError
if all records in the batch fail to process, we do this to reflect the failure in your operational metrics.
When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the Lambda service will scale down the concurrency of your function, potentially impacting performance.
For these scenarios, you can set the throwOnFullBatchFailure
option to false
when calling.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
Extending BatchProcessor¶
You might want to bring custom logic to the existing BatchProcessor
to slightly override how we handle successes and failures.
For these scenarios, you can subclass BatchProcessor
and quickly override successHandler
and failureHandler
methods:
successHandler()
– Keeps track of successful batch recordsfailureHandler()
– Keeps track of failed batch records
Let's suppose you'd like to add a metric named BatchRecordFailures
for each batch record that failed processing
Extending failure handling mechanism in BatchProcessor | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
|
Sequential async processing¶
By default, the BatchProcessor
processes records in parallel using Promise.all()
. However, if you need to preserve the order of records, you can set the processInParallel
option to false
to process records sequentially.
If the processInParallel
option is not provided, the BatchProcessor
will process records in parallel.
Sequential async processing | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
Create your own partial processor¶
You can create your own partial batch processor from scratch by inheriting the BasePartialProcessor
class, and implementing the prepare()
, clean()
, processRecord()
and processRecordSync()
abstract methods.
classDiagram
direction LR
class BasePartialProcessor {
<<interface>>
+prepare()
+clean()
+processRecord(record: BaseRecord)
+processRecordSync(record: BaseRecord)
}
class YourCustomProcessor {
+prepare()
+clean()
+processRecord(record: BaseRecord)
+processRecordSyc(record: BaseRecord)
}
BasePartialProcessor <|-- YourCustomProcessor : extends
Visual representation to bring your own processor
prepare()
– called once as part of the processor initializationclean()
– teardown logic called once afterprocessRecord
completesprocessRecord()
– If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logicprocessRecordSync()
– handles all processing logic for each individual message of a batch, including calling therecordHandler
(this.handler
)
You can then use this class as a context manager, or pass it to processPartialResponseSync
to process the records in your Lambda handler function.
Creating a custom batch processor | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
|
Tracing with AWS X-Ray¶
You can use Tracer to create subsegments for each batch record processed. To do so, you can open a new subsegment for each record, and close it when you're done processing it. When adding annotations and metadata to the subsegment, you can do so directly without calling tracer.setSegment(subsegment)
. This allows you to work with the subsegment directly and avoid having to either pass the parent subsegment around or have to restore the parent subsegment at the end of the record processing.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
|
- Retrieve the current segment, then create a subsegment for the record being processed
- You can add annotations and metadata to the subsegment directly without calling
tracer.setSegment(subsegment)
- Close the subsegment when you're done processing the record
Testing your code¶
As there is no external calls, you can unit test your code with BatchProcessorSync
quite easily.
Example:
Given a SQS batch where the first batch record succeeds and the second fails processing, we should have a single item reported in the function response.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
events/sqs_event.json | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
|