Skip to content

Base

Batch processing utilities

Usage Documentation

Batch processing

CLASS DESCRIPTION
AsyncBatchProcessor

Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB asynchronously.

BasePartialBatchProcessor
BasePartialProcessor

Abstract class for batch processors.

BatchProcessor

Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB.

AsyncBatchProcessor

AsyncBatchProcessor(
    event_type: EventType,
    model: BatchTypeModels | None = None,
    raise_on_entire_batch_failure: bool = True,
)

Bases: BasePartialBatchProcessor

Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB asynchronously.

Example

Process batch triggered by SQS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.SQS)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
async def record_handler(record: SQSRecord):
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch triggered by Kinesis Data Streams

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
async def record_handler(record: KinesisStreamRecord):
    logger.info(record.kinesis.data_as_text)
    payload: dict = record.kinesis.data_as_json()
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch triggered by DynamoDB Data Streams

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
async def record_handler(record: DynamoDBRecord):
    logger.info(record.dynamodb.new_image)
    payload: dict = json.loads(record.dynamodb.new_image.get("item"))
    # alternatively:
    # changes: dict[str, Any] = record.dynamodb.new_image  # noqa: ERA001
    # payload = change.get("Message") -> "<payload>"
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    batch = event["Records"]
    with processor(records=batch, processor=processor):
        processed_messages = processor.process() # kick off processing, return list[tuple]

    return processor.response()
RAISES DESCRIPTION
BatchProcessingError

When all batch records fail processing and raise_on_entire_batch_failure is True

Limitations
  • Sync record handler not supported, use BatchProcessor instead.
Source code in aws_lambda_powertools/utilities/batch/base.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def __init__(
    self,
    event_type: EventType,
    model: BatchTypeModels | None = None,
    raise_on_entire_batch_failure: bool = True,
):
    """Process batch and partially report failed items

    Parameters
    ----------
    event_type: EventType
        Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
    model: BatchTypeModels | None
        Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord
    raise_on_entire_batch_failure: bool
        Raise an exception when the entire batch has failed processing.
        When set to False, partial failures are reported in the response

    Exceptions
    ----------
    BatchProcessingError
        Raised when the entire batch has failed processing
    """
    self.event_type = event_type
    self.model = model
    self.raise_on_entire_batch_failure = raise_on_entire_batch_failure
    self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE)
    self._COLLECTOR_MAPPING = {
        EventType.SQS: self._collect_sqs_failures,
        EventType.KinesisDataStreams: self._collect_kinesis_failures,
        EventType.DynamoDBStreams: self._collect_dynamodb_failures,
    }
    self._DATA_CLASS_MAPPING = {
        EventType.SQS: SQSRecord,
        EventType.KinesisDataStreams: KinesisStreamRecord,
        EventType.DynamoDBStreams: DynamoDBRecord,
    }

    super().__init__()

BasePartialBatchProcessor

BasePartialBatchProcessor(
    event_type: EventType,
    model: BatchTypeModels | None = None,
    raise_on_entire_batch_failure: bool = True,
)

Bases: BasePartialProcessor

PARAMETER DESCRIPTION
event_type

Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event

TYPE: EventType

model

Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord

TYPE: BatchTypeModels | None DEFAULT: None

raise_on_entire_batch_failure

Raise an exception when the entire batch has failed processing. When set to False, partial failures are reported in the response

TYPE: bool DEFAULT: True

Exceptions

BatchProcessingError Raised when the entire batch has failed processing

METHOD DESCRIPTION
response

Batch items that failed processing, if any

Source code in aws_lambda_powertools/utilities/batch/base.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def __init__(
    self,
    event_type: EventType,
    model: BatchTypeModels | None = None,
    raise_on_entire_batch_failure: bool = True,
):
    """Process batch and partially report failed items

    Parameters
    ----------
    event_type: EventType
        Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
    model: BatchTypeModels | None
        Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord
    raise_on_entire_batch_failure: bool
        Raise an exception when the entire batch has failed processing.
        When set to False, partial failures are reported in the response

    Exceptions
    ----------
    BatchProcessingError
        Raised when the entire batch has failed processing
    """
    self.event_type = event_type
    self.model = model
    self.raise_on_entire_batch_failure = raise_on_entire_batch_failure
    self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE)
    self._COLLECTOR_MAPPING = {
        EventType.SQS: self._collect_sqs_failures,
        EventType.KinesisDataStreams: self._collect_kinesis_failures,
        EventType.DynamoDBStreams: self._collect_dynamodb_failures,
    }
    self._DATA_CLASS_MAPPING = {
        EventType.SQS: SQSRecord,
        EventType.KinesisDataStreams: KinesisStreamRecord,
        EventType.DynamoDBStreams: DynamoDBRecord,
    }

    super().__init__()

response

response() -> PartialItemFailureResponse

Batch items that failed processing, if any

Source code in aws_lambda_powertools/utilities/batch/base.py
274
275
276
def response(self) -> PartialItemFailureResponse:
    """Batch items that failed processing, if any"""
    return self.batch_response

BasePartialProcessor

BasePartialProcessor()

Bases: ABC

Abstract class for batch processors.

METHOD DESCRIPTION
async_process

Async call instance's handler for each record.

failure_handler

Keeps track of batch records that failed processing

process

Call instance's handler for each record.

success_handler

Keeps track of batch records that were processed successfully

Source code in aws_lambda_powertools/utilities/batch/base.py
65
66
67
68
def __init__(self):
    self.success_messages: list[BatchEventTypes] = []
    self.fail_messages: list[BatchEventTypes] = []
    self.exceptions: list[ExceptionInfo] = []

async_process

async_process() -> list[tuple]

Async call instance's handler for each record.

Note

We keep the outer function synchronous to prevent making Lambda handler async, so to not impact customers' existing middlewares. Instead, we create an async closure to handle asynchrony.

We also handle edge cases like Lambda container thaw by getting an existing or creating an event loop.

See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdown

Source code in aws_lambda_powertools/utilities/batch/base.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def async_process(self) -> list[tuple]:
    """
    Async call instance's handler for each record.

    Note
    ----

    We keep the outer function synchronous to prevent making Lambda handler async, so to not impact
    customers' existing middlewares. Instead, we create an async closure to handle asynchrony.

    We also handle edge cases like Lambda container thaw by getting an existing or creating an event loop.

    See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdown
    """

    async def async_process_closure():
        return list(await asyncio.gather(*[self._async_process_record(record) for record in self.records]))

    # WARNING
    # Do not use "asyncio.run(async_process())" due to Lambda container thaws/freeze, otherwise we might get "Event Loop is closed" # noqa: E501
    # Instead, get_event_loop() can also create one if a previous was erroneously closed
    # Mangum library does this as well. It's battle tested with other popular async-only frameworks like FastAPI
    # https://github.com/jordaneremieff/mangum/discussions/256#discussioncomment-2638946
    # https://github.com/jordaneremieff/mangum/blob/b85cd4a97f8ddd56094ccc540ca7156c76081745/mangum/protocols/http.py#L44

    # Let's prime the coroutine and decide
    # whether we create an event loop (Lambda) or schedule it as usual (non-Lambda)
    coro = async_process_closure()
    if os.getenv(constants.LAMBDA_TASK_ROOT_ENV):
        loop = asyncio.get_event_loop()  # NOTE: this might return an error starting in Python 3.12 in a few years
        task_instance = loop.create_task(coro)
        return loop.run_until_complete(task_instance)

    # Non-Lambda environment, run coroutine as usual
    return asyncio.run(coro)

failure_handler

failure_handler(
    record, exception: ExceptionInfo
) -> FailureResponse

Keeps track of batch records that failed processing

PARAMETER DESCRIPTION
record

record that failed processing

exception

Exception information containing type, value, and traceback (sys.exc_info())

TYPE: ExceptionInfo

RETURNS DESCRIPTION
FailureResponse

"fail", exceptions args, original record

Source code in aws_lambda_powertools/utilities/batch/base.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
    """
    Keeps track of batch records that failed processing

    Parameters
    ----------
    record: Any
        record that failed processing
    exception: ExceptionInfo
        Exception information containing type, value, and traceback (sys.exc_info())

    Returns
    -------
    FailureResponse
        "fail", exceptions args, original record
    """
    exception_string = f"{exception[0]}:{exception[1]}"
    entry = ("fail", exception_string, record)
    logger.debug(f"Record processing exception: {exception_string}")
    self.exceptions.append(exception)
    self.fail_messages.append(record)
    return entry

process

process() -> list[tuple]

Call instance's handler for each record.

Source code in aws_lambda_powertools/utilities/batch/base.py
91
92
93
94
95
def process(self) -> list[tuple]:
    """
    Call instance's handler for each record.
    """
    return [self._process_record(record) for record in self.records]

success_handler

success_handler(record, result: Any) -> SuccessResponse

Keeps track of batch records that were processed successfully

PARAMETER DESCRIPTION
record

record that succeeded processing

result

result from record handler

TYPE: Any

RETURNS DESCRIPTION
SuccessResponse

"success", result, original record

Source code in aws_lambda_powertools/utilities/batch/base.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def success_handler(self, record, result: Any) -> SuccessResponse:
    """
    Keeps track of batch records that were processed successfully

    Parameters
    ----------
    record: Any
        record that succeeded processing
    result: Any
        result from record handler

    Returns
    -------
    SuccessResponse
        "success", result, original record
    """
    entry = ("success", result, record)
    self.success_messages.append(record)
    return entry

BatchProcessor

BatchProcessor(
    event_type: EventType,
    model: BatchTypeModels | None = None,
    raise_on_entire_batch_failure: bool = True,
)

Bases: BasePartialBatchProcessor

Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB.

Example

Process batch triggered by SQS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.SQS)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: SQSRecord):
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch triggered by Kinesis Data Streams

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: KinesisStreamRecord):
    logger.info(record.kinesis.data_as_text)
    payload: dict = record.kinesis.data_as_json()
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch triggered by DynamoDB Data Streams

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: DynamoDBRecord):
    logger.info(record.dynamodb.new_image)
    payload: dict = json.loads(record.dynamodb.new_image.get("item"))
    # alternatively:
    # changes: dict[str, Any] = record.dynamodb.new_image  # noqa: ERA001
    # payload = change.get("Message") -> "<payload>"
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    batch = event["Records"]
    with processor(records=batch, processor=processor):
        processed_messages = processor.process() # kick off processing, return list[tuple]

    return processor.response()
RAISES DESCRIPTION
BatchProcessingError

When all batch records fail processing and raise_on_entire_batch_failure is True

Limitations
  • Async record handler not supported, use AsyncBatchProcessor instead.
Source code in aws_lambda_powertools/utilities/batch/base.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def __init__(
    self,
    event_type: EventType,
    model: BatchTypeModels | None = None,
    raise_on_entire_batch_failure: bool = True,
):
    """Process batch and partially report failed items

    Parameters
    ----------
    event_type: EventType
        Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
    model: BatchTypeModels | None
        Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord
    raise_on_entire_batch_failure: bool
        Raise an exception when the entire batch has failed processing.
        When set to False, partial failures are reported in the response

    Exceptions
    ----------
    BatchProcessingError
        Raised when the entire batch has failed processing
    """
    self.event_type = event_type
    self.model = model
    self.raise_on_entire_batch_failure = raise_on_entire_batch_failure
    self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE)
    self._COLLECTOR_MAPPING = {
        EventType.SQS: self._collect_sqs_failures,
        EventType.KinesisDataStreams: self._collect_kinesis_failures,
        EventType.DynamoDBStreams: self._collect_dynamodb_failures,
    }
    self._DATA_CLASS_MAPPING = {
        EventType.SQS: SQSRecord,
        EventType.KinesisDataStreams: KinesisStreamRecord,
        EventType.DynamoDBStreams: DynamoDBRecord,
    }

    super().__init__()