The batch processing utility provides a way to handle partial failures when processing batches of messages from SQS queues,
SQS FIFO queues, Kinesis Streams, or DynamoDB Streams.
stateDiagram-v2
direction LR
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
LambdaInit: Lambda invocation
BatchProcessor: Batch Processor
RecordHandler: Record Handler function
YourLogic: Your logic to process each batch item
LambdaResponse: Lambda response
BatchSource --> LambdaInit
LambdaInit --> BatchProcessor
BatchProcessor --> RecordHandler
state BatchProcessor {
[*] --> RecordHandler: Your function
RecordHandler --> YourLogic
}
RecordHandler --> BatchProcessor: Collect results
BatchProcessor --> LambdaResponse: Report items that failed processing
Key Features
Reports batch item failures to reduce number of retries for a record upon errors
Simple interface to process each batch record
Integrates with Java Events library and the deserialization module
Build your own batch processor by extending primitives
Background
When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are
triggered with a batch of messages.
If your function fails to process any message from the batch, the entire batch returns to your queue or stream.
This same batch is then retried until either condition happens first:
a) your Lambda function returns a successful response,
b) record reaches maximum retry attempts, or
c) records expire.
journey
section Conditions
Successful response: 5: Success
Maximum retries: 3: Failure
Records expired: 1: Failure
This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration:
SQS queues. Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
Kinesis data streams and DynamoDB streams.
Single reported failure will use its sequence number as the stream checkpoint.
Multiple reported failures will use the lowest sequence number as checkpoint.
With this utility, batch records are processed individually – only messages that failed to be processed
return to the queue or stream for a further retry. You simply build a BatchProcessor in your handler,
and return its response from the handler's processMessage implementation. Exceptions are handled
internally and an appropriate partial response for the message source is returned to Lambda for you.
Warning
While this utility lowers the chance of processing messages more than once, it is still not guaranteed.
We recommend implementing processing logic in an idempotent manner wherever possible, for instance,
by taking advantage of the idempotency module.
More details on how Lambda works with SQS can be found in the AWS documentation
We simply add powertools-batch to our build dependencies. Note - if you are using other Powertools
modules that require code-weaving, such as powertools-core, you will need to configure that also.
For this feature to work, you need to (1) configure your Lambda function event source to use ReportBatchItemFailures,
and (2) return a specific response to report which records failed to be processed.
You can use your preferred deployment framework to set the correct configuration while this utility,
while the powertools-batch module handles generating the response, which simply needs to be returned as the result of
your Lambda handler.
importcom.amazonaws.services.lambda.runtime.Context;importcom.amazonaws.services.lambda.runtime.RequestHandler;importcom.amazonaws.services.lambda.runtime.events.SQSBatchResponse;importcom.amazonaws.services.lambda.runtime.events.SQSEvent;importsoftware.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;importsoftware.amazon.lambda.powertools.batch.handler.BatchMessageHandler;publicclassSqsBatchHandlerimplementsRequestHandler<SQSEvent,SQSBatchResponse>{privatefinalBatchMessageHandler<SQSEvent,SQSBatchResponse>handler;publicSqsBatchHandler(){handler=newBatchMessageHandlerBuilder().withSqsBatchHandler().buildWithMessageHandler(this::processMessage,Product.class);}@OverridepublicSQSBatchResponsehandleRequest(SQSEventsqsEvent,Contextcontext){returnhandler.processBatch(sqsEvent,context);}privatevoidprocessMessage(Productp,Contextc){// Process the product}}
importcom.amazonaws.services.lambda.runtime.Context;importcom.amazonaws.services.lambda.runtime.RequestHandler;importcom.amazonaws.services.lambda.runtime.events.KinesisEvent;importcom.amazonaws.services.lambda.runtime.events.StreamsEventResponse;importsoftware.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;importsoftware.amazon.lambda.powertools.batch.handler.BatchMessageHandler;publicclassKinesisBatchHandlerimplementsRequestHandler<KinesisEvent,StreamsEventResponse>{privatefinalBatchMessageHandler<KinesisEvent,StreamsEventResponse>handler;publicKinesisBatchHandler(){handler=newBatchMessageHandlerBuilder().withKinesisBatchHandler().buildWithMessageHandler(this::processMessage,Product.class);}@OverridepublicStreamsEventResponsehandleRequest(KinesisEventkinesisEvent,Contextcontext){returnhandler.processBatch(kinesisEvent,context);}privatevoidprocessMessage(Productp,Contextc){// process the product}}
importcom.amazonaws.services.lambda.runtime.Context;importcom.amazonaws.services.lambda.runtime.RequestHandler;importcom.amazonaws.services.lambda.runtime.events.DynamodbEvent;importcom.amazonaws.services.lambda.runtime.events.StreamsEventResponse;importsoftware.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;importsoftware.amazon.lambda.powertools.batch.handler.BatchMessageHandler;publicclassDynamoDBStreamBatchHandlerimplementsRequestHandler<DynamodbEvent,StreamsEventResponse>{privatefinalBatchMessageHandler<DynamodbEvent,StreamsEventResponse>handler;publicDynamoDBStreamBatchHandler(){handler=newBatchMessageHandlerBuilder().withDynamoDbBatchHandler().buildWithRawMessageHandler(this::processMessage);}@OverridepublicStreamsEventResponsehandleRequest(DynamodbEventddbEvent,Contextcontext){returnhandler.processBatch(ddbEvent,context);}privatevoidprocessMessage(DynamodbEvent.DynamodbStreamRecorddynamodbStreamRecord,Contextcontext){// Process the change record}}
You must provide either a raw message handler, or a deserialized message handler. The raw message handler receives
the envelope record type relevant for the particular event source - for instance, the SQS event source provides
SQSMessage
instances. The deserialized message handler extracts the body from this envelope, and deserializes it to a user-defined
type. Note that deserialized message handlers are not relevant for the DynamoDB provider, as the format of the inner
message is fixed by DynamoDB.
In general, the deserialized message handler should be used unless you need access to information on the envelope.
123456789
publicvoidsetup(){BatchMessageHandler<SQSEvent,SQSBatchResponse>handler=newBatchMessageHandlerBuilder().withSqsBatchHandler().buildWithRawMessageHandler(this::processRawMessage);}privatevoidprocessRawMessage(SQSEvent.SQSMessagesqsMessage){// Do something with the raw message}
123456789
publicvoidsetup(){BatchMessageHandler<SQSEvent,SQSBatchResponse>handler=newBatchMessageHandlerBuilder().withSqsBatchHandler().buildWitMessageHandler(this::processRawMessage,Product.class);}privatevoidprocessMessage(Productproduct){// Do something with the deserialized message}
You can register a success or failure handler which will be invoked as each message is processed by the batch
module. This may be useful for reporting - for instance, writing metrics or logging failures.
These handlers are optional. Batch failures are handled by the module regardless of whether or not you
provide a custom failure handler.
Handlers can be provided when building the batch processor and are available for all event sources.
For instance for DynamoDB:
1 2 3 4 5 6 7 8 910111213
BatchMessageHandler<DynamodbEvent,StreamsEventResponse>handler=newBatchMessageHandlerBuilder().withDynamoDbBatchHandler().withSuccessHandler((m)->{// Success handler receives the raw messageLOGGER.info("Message with sequenceNumber {} was successfully processed",m.getDynamodb().getSequenceNumber());}).withFailureHandler((m,e)->{// Failure handler receives the raw message and the exception thrown.LOGGER.info("Message with sequenceNumber {} failed to be processed: {}",e.getDynamodb().getSequenceNumber(),e);}).buildWithMessageHander(this::processMessage);
Info
If the success handler throws an exception, the item it is processing will be marked as failed by the
batch processor.
If the failure handler throws, the batch processing will continue; the item it is processing has
already been marked as failed.
Both raw and deserialized message handlers can choose to take the Lambda context as an argument if they
need it, or not:
1 2 3 4 5 6 7 8 910
publicclassClassWithHandlers{privatevoidprocessMessage(Productproduct){// Do something with the raw message}privatevoidprocessMessageWithContext(Productproduct,Contextcontext){// Do something with the raw message and the lambda Context}}