Module aws_lambda_powertools.utilities.batch

Batch processing utility

Expand source code
# -*- coding: utf-8 -*-

"""
Batch processing utility
"""

from aws_lambda_powertools.utilities.batch.base import (
    AsyncBatchProcessor,
    BasePartialBatchProcessor,
    BasePartialProcessor,
    BatchProcessor,
    EventType,
    FailureResponse,
    SuccessResponse,
)
from aws_lambda_powertools.utilities.batch.decorators import (
    async_batch_processor,
    async_process_partial_response,
    batch_processor,
    process_partial_response,
)
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo
from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import (
    SqsFifoPartialProcessor,
)
from aws_lambda_powertools.utilities.batch.types import BatchTypeModels

__all__ = (
    "async_batch_processor",
    "async_process_partial_response",
    "batch_processor",
    "process_partial_response",
    "BatchProcessor",
    "AsyncBatchProcessor",
    "BasePartialProcessor",
    "BasePartialBatchProcessor",
    "BatchTypeModels",
    "ExceptionInfo",
    "EventType",
    "FailureResponse",
    "SuccessResponse",
    "SqsFifoPartialProcessor",
)

Sub-modules

aws_lambda_powertools.utilities.batch.base

Batch processing utilities

aws_lambda_powertools.utilities.batch.decorators
aws_lambda_powertools.utilities.batch.exceptions

Batch processing exceptions

aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor
aws_lambda_powertools.utilities.batch.types

Functions

def async_batch_processor(handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable[..., Awaitable[Any]], processor: AsyncBatchProcessor)

Middleware to handle batch event processing

Notes

Consider using async_process_partial_response function for an easier experience.

Parameters

handler : Callable
Lambda's handler
event : Dict
Lambda's Event
context : LambdaContext
Lambda's Context
record_handler : Callable[…, Awaitable[Any]]
Callable to process each record from the batch
processor : AsyncBatchProcessor
Batch Processor to handle partial failure cases

Examples

Processes Lambda's event with a BasePartialProcessor >>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor >>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord >>> >>> processor = AsyncBatchProcessor(event_type=EventType.SQS) >>> >>> async def async_record_handler(record: SQSRecord): >>> payload: str = record.body >>> return payload >>> >>> @async_batch_processor(record_handler=async_record_handler, processor=processor) >>> def lambda_handler(event, context): >>> return processor.response()

Limitations

Expand source code
@lambda_handler_decorator
def async_batch_processor(
    handler: Callable,
    event: Dict,
    context: LambdaContext,
    record_handler: Callable[..., Awaitable[Any]],
    processor: AsyncBatchProcessor,
):
    """
    Middleware to handle batch event processing

    Notes
    -----
    Consider using async_process_partial_response function for an easier experience.

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: Dict
        Lambda's Event
    context: LambdaContext
        Lambda's Context
    record_handler: Callable[..., Awaitable[Any]]
        Callable to process each record from the batch
    processor: AsyncBatchProcessor
        Batch Processor to handle partial failure cases

    Examples
    --------
    **Processes Lambda's event with a BasePartialProcessor**
        >>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor
        >>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
        >>>
        >>> processor = AsyncBatchProcessor(event_type=EventType.SQS)
        >>>
        >>> async def async_record_handler(record: SQSRecord):
        >>>     payload: str = record.body
        >>>     return payload
        >>>
        >>> @async_batch_processor(record_handler=async_record_handler, processor=processor)
        >>> def lambda_handler(event, context):
        >>>     return processor.response()

    Limitations
    -----------
    * Sync batch processors. Use `batch_processor` instead.
    """
    records = event["Records"]

    with processor(records, record_handler, lambda_context=context):
        processor.async_process()

    return handler(event, context)
def async_process_partial_response(event: Dict, record_handler: Callable, processor: AsyncBatchProcessor, context: LambdaContext | None = None) ‑> PartialItemFailureResponse

Higher level function to handle batch event processing asynchronously.

Parameters

event : Dict
Lambda's original event
record_handler : Callable
Callable to process each record from the batch
processor : AsyncBatchProcessor
Batch Processor to handle partial failure cases
context : LambdaContext
Lambda's context, used to optionally inject in record handler

Returns

result : PartialItemFailureResponse
Lambda Partial Batch Response

Examples

Processes Lambda's SQS event

from aws_lambda_powertools.utilities.batch import AsyncBatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

processor = BatchProcessor(EventType.SQS)

async def record_handler(record: SQSRecord):
    return record.body

def handler(event, context):
    return async_process_partial_response(
        event=event, record_handler=record_handler, processor=processor, context=context
    )

Limitations

Expand source code
def async_process_partial_response(
    event: Dict,
    record_handler: Callable,
    processor: AsyncBatchProcessor,
    context: LambdaContext | None = None,
) -> PartialItemFailureResponse:
    """
    Higher level function to handle batch event processing asynchronously.

    Parameters
    ----------
    event: Dict
        Lambda's original event
    record_handler: Callable
        Callable to process each record from the batch
    processor: AsyncBatchProcessor
        Batch Processor to handle partial failure cases
    context: LambdaContext
        Lambda's context, used to optionally inject in record handler

    Returns
    -------
    result: PartialItemFailureResponse
        Lambda Partial Batch Response

    Examples
    --------
    **Processes Lambda's SQS event**

    ```python
    from aws_lambda_powertools.utilities.batch import AsyncBatchProcessor, EventType, process_partial_response
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

    processor = BatchProcessor(EventType.SQS)

    async def record_handler(record: SQSRecord):
        return record.body

    def handler(event, context):
        return async_process_partial_response(
            event=event, record_handler=record_handler, processor=processor, context=context
        )
    ```

    Limitations
    -----------
    * Sync batch processors. Use `process_partial_response` instead.
    """
    try:
        records: List[Dict] = event.get("Records", [])
    except AttributeError:
        event_types = ", ".join(list(EventType.__members__))
        docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs"  # noqa: E501 # long-line
        raise ValueError(
            f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n"
            f"See sample events in our documentation for either {event_types}: \n {docs}",
        )

    with processor(records, record_handler, context):
        processor.async_process()

    return processor.response()
def batch_processor(handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: BatchProcessor)

Middleware to handle batch event processing

Notes

Consider using process_partial_response function for an easier experience.

Parameters

handler : Callable
Lambda's handler
event : Dict
Lambda's Event
context : LambdaContext
Lambda's Context
record_handler : Callable
Callable or corutine to process each record from the batch
processor : BatchProcessor
Batch Processor to handle partial failure cases

Examples

Processes Lambda's event with a BatchProcessor

>>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType
>>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
>>>
>>> processor = BatchProcessor(EventType.SQS)
>>>
>>> def record_handler(record):
>>>     return record["body"]
>>>
>>> @batch_processor(record_handler=record_handler, processor=BatchProcessor())
>>> def handler(event, context):
>>>     return processor.response()

Limitations

Expand source code
@lambda_handler_decorator
def batch_processor(
    handler: Callable,
    event: Dict,
    context: LambdaContext,
    record_handler: Callable,
    processor: BatchProcessor,
):
    """
    Middleware to handle batch event processing

    Notes
    -----
    Consider using process_partial_response function for an easier experience.

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: Dict
        Lambda's Event
    context: LambdaContext
        Lambda's Context
    record_handler: Callable
        Callable or corutine to process each record from the batch
    processor: BatchProcessor
        Batch Processor to handle partial failure cases

    Examples
    --------
    **Processes Lambda's event with a BatchProcessor**

        >>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType
        >>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
        >>>
        >>> processor = BatchProcessor(EventType.SQS)
        >>>
        >>> def record_handler(record):
        >>>     return record["body"]
        >>>
        >>> @batch_processor(record_handler=record_handler, processor=BatchProcessor())
        >>> def handler(event, context):
        >>>     return processor.response()

    Limitations
    -----------
    * Async batch processors. Use `async_batch_processor` instead.
    """
    records = event["Records"]

    with processor(records, record_handler, lambda_context=context):
        processor.process()

    return handler(event, context)
def process_partial_response(event: Dict, record_handler: Callable, processor: BasePartialBatchProcessor, context: LambdaContext | None = None) ‑> PartialItemFailureResponse

Higher level function to handle batch event processing.

Parameters

event : Dict
Lambda's original event
record_handler : Callable
Callable to process each record from the batch
processor : BasePartialBatchProcessor
Batch Processor to handle partial failure cases
context : LambdaContext
Lambda's context, used to optionally inject in record handler

Returns

result : PartialItemFailureResponse
Lambda Partial Batch Response

Examples

Processes Lambda's SQS event

from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

processor = BatchProcessor(EventType.SQS)

def record_handler(record: SQSRecord):
    return record.body

def handler(event, context):
    return process_partial_response(
        event=event, record_handler=record_handler, processor=processor, context=context
    )

Limitations

Expand source code
def process_partial_response(
    event: Dict,
    record_handler: Callable,
    processor: BasePartialBatchProcessor,
    context: LambdaContext | None = None,
) -> PartialItemFailureResponse:
    """
    Higher level function to handle batch event processing.

    Parameters
    ----------
    event: Dict
        Lambda's original event
    record_handler: Callable
        Callable to process each record from the batch
    processor: BasePartialBatchProcessor
        Batch Processor to handle partial failure cases
    context: LambdaContext
        Lambda's context, used to optionally inject in record handler

    Returns
    -------
    result: PartialItemFailureResponse
        Lambda Partial Batch Response

    Examples
    --------
    **Processes Lambda's SQS event**

    ```python
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

    processor = BatchProcessor(EventType.SQS)

    def record_handler(record: SQSRecord):
        return record.body

    def handler(event, context):
        return process_partial_response(
            event=event, record_handler=record_handler, processor=processor, context=context
        )
    ```

    Limitations
    -----------
    * Async batch processors. Use `async_process_partial_response` instead.
    """
    try:
        records: List[Dict] = event.get("Records", [])
    except AttributeError:
        event_types = ", ".join(list(EventType.__members__))
        docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs"  # noqa: E501 # long-line
        raise ValueError(
            f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n"
            f"See sample events in our documentation for either {event_types}: \n {docs}",
        )

    with processor(records, record_handler, context):
        processor.process()

    return processor.response()

Classes

class AsyncBatchProcessor (event_type: EventType, model: Optional[ForwardRef('BatchTypeModels')] = None)

Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB asynchronously.

Example

Process batch triggered by SQS

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.SQS)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
async def record_handler(record: SQSRecord):
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch triggered by Kinesis Data Streams

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
async def record_handler(record: KinesisStreamRecord):
    logger.info(record.kinesis.data_as_text)
    payload: dict = record.kinesis.data_as_json()
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch triggered by DynamoDB Data Streams

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
async def record_handler(record: DynamoDBRecord):
    logger.info(record.dynamodb.new_image)
    payload: dict = json.loads(record.dynamodb.new_image.get("item"))
    # alternatively:
    # changes: Dict[str, Any] = record.dynamodb.new_image  # noqa: ERA001
    # payload = change.get("Message") -> "<payload>"
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    batch = event["Records"]
    with processor(records=batch, processor=processor):
        processed_messages = processor.process() # kick off processing, return list[tuple]

    return processor.response()

Raises

BatchProcessingError
When all batch records fail processing

Limitations

  • Sync record handler not supported, use BatchProcessor instead.

Process batch and partially report failed items

Parameters

event_type : EventType
Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
model : Optional["BatchTypeModels"]
Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord

Exceptions

BatchProcessingError Raised when the entire batch has failed processing

Expand source code
class AsyncBatchProcessor(BasePartialBatchProcessor):
    """Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB asynchronously.

    Example
    -------

    ## Process batch triggered by SQS

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = BatchProcessor(event_type=EventType.SQS)
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    async def record_handler(record: SQSRecord):
        payload: str = record.body
        if payload:
            item: dict = json.loads(payload)
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    @batch_processor(record_handler=record_handler, processor=processor)
    def lambda_handler(event, context: LambdaContext):
        return processor.response()
    ```

    ## Process batch triggered by Kinesis Data Streams

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    async def record_handler(record: KinesisStreamRecord):
        logger.info(record.kinesis.data_as_text)
        payload: dict = record.kinesis.data_as_json()
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    @batch_processor(record_handler=record_handler, processor=processor)
    def lambda_handler(event, context: LambdaContext):
        return processor.response()
    ```

    ## Process batch triggered by DynamoDB Data Streams

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    async def record_handler(record: DynamoDBRecord):
        logger.info(record.dynamodb.new_image)
        payload: dict = json.loads(record.dynamodb.new_image.get("item"))
        # alternatively:
        # changes: Dict[str, Any] = record.dynamodb.new_image  # noqa: ERA001
        # payload = change.get("Message") -> "<payload>"
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    def lambda_handler(event, context: LambdaContext):
        batch = event["Records"]
        with processor(records=batch, processor=processor):
            processed_messages = processor.process() # kick off processing, return list[tuple]

        return processor.response()
    ```


    Raises
    ------
    BatchProcessingError
        When all batch records fail processing

    Limitations
    -----------
    * Sync record handler not supported, use BatchProcessor instead.
    """

    def _process_record(self, record: dict):
        raise NotImplementedError()

    async def _async_process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
        """
        Process a record with instance's handler

        Parameters
        ----------
        record: dict
            A batch record to be processed.
        """
        data: Optional["BatchTypeModels"] = None
        try:
            data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
            if self._handler_accepts_lambda_context:
                result = await self.handler(record=data, lambda_context=self.lambda_context)
            else:
                result = await self.handler(record=data)

            return self.success_handler(record=record, result=result)
        except Exception as exc:
            # NOTE: Pydantic is an optional dependency, but when used and a poison pill scenario happens
            # we need to handle that exception differently.
            # We check for a public attr in validation errors coming from Pydantic exceptions (subclass or not)
            # and we compare if it's coming from the same model that trigger the exception in the first place

            # Pydantic v1 raises a ValidationError with ErrorWrappers and store the model instance in a class variable.
            # Pydantic v2 simplifies this by adding a title variable to store the model name directly.
            model = getattr(exc, "model", None) or getattr(exc, "title", None)
            model_name = getattr(self.model, "__name__", None)

            if model in (self.model, model_name):
                return self._register_model_validation_error_record(record)

            return self.failure_handler(record=data, exception=sys.exc_info())

Ancestors

Inherited members

class BasePartialBatchProcessor (event_type: EventType, model: Optional[ForwardRef('BatchTypeModels')] = None)

Abstract class for batch processors.

Process batch and partially report failed items

Parameters

event_type : EventType
Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
model : Optional["BatchTypeModels"]
Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord

Exceptions

BatchProcessingError Raised when the entire batch has failed processing

Expand source code
class BasePartialBatchProcessor(BasePartialProcessor):  # noqa
    DEFAULT_RESPONSE: PartialItemFailureResponse = {"batchItemFailures": []}

    def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None):
        """Process batch and partially report failed items

        Parameters
        ----------
        event_type: EventType
            Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
        model: Optional["BatchTypeModels"]
            Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord

        Exceptions
        ----------
        BatchProcessingError
            Raised when the entire batch has failed processing
        """
        self.event_type = event_type
        self.model = model
        self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE)
        self._COLLECTOR_MAPPING = {
            EventType.SQS: self._collect_sqs_failures,
            EventType.KinesisDataStreams: self._collect_kinesis_failures,
            EventType.DynamoDBStreams: self._collect_dynamodb_failures,
        }
        self._DATA_CLASS_MAPPING = {
            EventType.SQS: SQSRecord,
            EventType.KinesisDataStreams: KinesisStreamRecord,
            EventType.DynamoDBStreams: DynamoDBRecord,
        }

        super().__init__()

    def response(self) -> PartialItemFailureResponse:
        """Batch items that failed processing, if any"""
        return self.batch_response

    def _prepare(self):
        """
        Remove results from previous execution.
        """
        self.success_messages.clear()
        self.fail_messages.clear()
        self.exceptions.clear()
        self.batch_response = copy.deepcopy(self.DEFAULT_RESPONSE)

    def _clean(self):
        """
        Report messages to be deleted in case of partial failure.
        """

        if not self._has_messages_to_report():
            return

        if self._entire_batch_failed():
            raise BatchProcessingError(
                msg=f"All records failed processing. {len(self.exceptions)} individual errors logged "
                f"separately below.",
                child_exceptions=self.exceptions,
            )

        messages = self._get_messages_to_report()
        self.batch_response = {"batchItemFailures": messages}

    def _has_messages_to_report(self) -> bool:
        if self.fail_messages:
            return True

        logger.debug(f"All {len(self.success_messages)} records successfully processed")
        return False

    def _entire_batch_failed(self) -> bool:
        return len(self.exceptions) == len(self.records)

    def _get_messages_to_report(self) -> List[PartialItemFailures]:
        """
        Format messages to use in batch deletion
        """
        return self._COLLECTOR_MAPPING[self.event_type]()

    # Event Source Data Classes follow python idioms for fields
    # while Parser/Pydantic follows the event field names to the latter
    def _collect_sqs_failures(self):
        failures = []
        for msg in self.fail_messages:
            # If a message failed due to model validation (e.g., poison pill)
            # we convert to an event source data class...but self.model is still true
            # therefore, we do an additional check on whether the failed message is still a model
            # see https://github.com/aws-powertools/powertools-lambda-python/issues/2091
            if self.model and getattr(msg, "parse_obj", None):
                msg_id = msg.messageId
            else:
                msg_id = msg.message_id
            failures.append({"itemIdentifier": msg_id})
        return failures

    def _collect_kinesis_failures(self):
        failures = []
        for msg in self.fail_messages:
            # # see https://github.com/aws-powertools/powertools-lambda-python/issues/2091
            if self.model and getattr(msg, "parse_obj", None):
                msg_id = msg.kinesis.sequenceNumber
            else:
                msg_id = msg.kinesis.sequence_number
            failures.append({"itemIdentifier": msg_id})
        return failures

    def _collect_dynamodb_failures(self):
        failures = []
        for msg in self.fail_messages:
            # see https://github.com/aws-powertools/powertools-lambda-python/issues/2091
            if self.model and getattr(msg, "parse_obj", None):
                msg_id = msg.dynamodb.SequenceNumber
            else:
                msg_id = msg.dynamodb.sequence_number
            failures.append({"itemIdentifier": msg_id})
        return failures

    @overload
    def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels":
        ...  # pragma: no cover

    @overload
    def _to_batch_type(self, record: dict, event_type: EventType) -> EventSourceDataClassTypes:
        ...  # pragma: no cover

    def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["BatchTypeModels"] = None):
        if model is not None:
            # If a model is provided, we assume Pydantic is installed and we need to disable v2 warnings
            from aws_lambda_powertools.utilities.parser.compat import disable_pydantic_v2_warning

            disable_pydantic_v2_warning()

            return model.parse_obj(record)
        return self._DATA_CLASS_MAPPING[event_type](record)

    def _register_model_validation_error_record(self, record: dict):
        """Convert and register failure due to poison pills where model failed validation early"""
        # Parser will fail validation if record is a poison pill (malformed input)
        # this means we can't collect the message id if we try transforming again
        # so we convert into to the equivalent batch type model (e.g., SQS, Kinesis, DynamoDB Stream)
        # and downstream we can correctly collect the correct message id identifier and make the failed record available
        # see https://github.com/aws-powertools/powertools-lambda-python/issues/2091
        logger.debug("Record cannot be converted to customer's model; converting without model")
        failed_record: "EventSourceDataClassTypes" = self._to_batch_type(record=record, event_type=self.event_type)
        return self.failure_handler(record=failed_record, exception=sys.exc_info())

Ancestors

Subclasses

Class variables

var DEFAULT_RESPONSEPartialItemFailureResponse

Methods

def response(self) ‑> PartialItemFailureResponse

Batch items that failed processing, if any

Expand source code
def response(self) -> PartialItemFailureResponse:
    """Batch items that failed processing, if any"""
    return self.batch_response

Inherited members

class BasePartialProcessor

Abstract class for batch processors.

Expand source code
class BasePartialProcessor(ABC):
    """
    Abstract class for batch processors.
    """

    lambda_context: LambdaContext

    def __init__(self):
        self.success_messages: List[BatchEventTypes] = []
        self.fail_messages: List[BatchEventTypes] = []
        self.exceptions: List[ExceptionInfo] = []

    @abstractmethod
    def _prepare(self):
        """
        Prepare context manager.
        """
        raise NotImplementedError()

    @abstractmethod
    def _clean(self):
        """
        Clear context manager.
        """
        raise NotImplementedError()

    @abstractmethod
    def _process_record(self, record: dict):
        """
        Process record with handler.
        """
        raise NotImplementedError()

    def process(self) -> List[Tuple]:
        """
        Call instance's handler for each record.
        """
        return [self._process_record(record) for record in self.records]

    @abstractmethod
    async def _async_process_record(self, record: dict):
        """
        Async process record with handler.
        """
        raise NotImplementedError()

    def async_process(self) -> List[Tuple]:
        """
        Async call instance's handler for each record.

        Note
        ----

        We keep the outer function synchronous to prevent making Lambda handler async, so to not impact
        customers' existing middlewares. Instead, we create an async closure to handle asynchrony.

        We also handle edge cases like Lambda container thaw by getting an existing or creating an event loop.

        See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdown
        """

        async def async_process_closure():
            return list(await asyncio.gather(*[self._async_process_record(record) for record in self.records]))

        # WARNING
        # Do not use "asyncio.run(async_process())" due to Lambda container thaws/freeze, otherwise we might get "Event Loop is closed" # noqa: E501
        # Instead, get_event_loop() can also create one if a previous was erroneously closed
        # Mangum library does this as well. It's battle tested with other popular async-only frameworks like FastAPI
        # https://github.com/jordaneremieff/mangum/discussions/256#discussioncomment-2638946
        # https://github.com/jordaneremieff/mangum/blob/b85cd4a97f8ddd56094ccc540ca7156c76081745/mangum/protocols/http.py#L44

        # Let's prime the coroutine and decide
        # whether we create an event loop (Lambda) or schedule it as usual (non-Lambda)
        coro = async_process_closure()
        if os.getenv(constants.LAMBDA_TASK_ROOT_ENV):
            loop = asyncio.get_event_loop()  # NOTE: this might return an error starting in Python 3.12 in a few years
            task_instance = loop.create_task(coro)
            return loop.run_until_complete(task_instance)

        # Non-Lambda environment, run coroutine as usual
        return asyncio.run(coro)

    def __enter__(self):
        self._prepare()
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        self._clean()

    def __call__(self, records: List[dict], handler: Callable, lambda_context: Optional[LambdaContext] = None):
        """
        Set instance attributes before execution

        Parameters
        ----------
        records: List[dict]
            List with objects to be processed.
        handler: Callable
            Callable to process "records" entries.
        """
        self.records = records
        self.handler = handler

        # NOTE: If a record handler has `lambda_context` parameter in its function signature, we inject it.
        # This is the earliest we can inspect for signature to prevent impacting performance.
        #
        #   Mechanism:
        #
        #   1. When using the `@batch_processor` decorator, this happens automatically.
        #   2. When using the context manager, customers have to include `lambda_context` param.
        #
        #   Scenario: Injects Lambda context
        #
        #   def record_handler(record, lambda_context): ... # noqa: ERA001
        #   with processor(records=batch, handler=record_handler, lambda_context=context): ... # noqa: ERA001
        #
        #   Scenario: Does NOT inject Lambda context (default)
        #
        #   def record_handler(record): pass # noqa: ERA001
        #   with processor(records=batch, handler=record_handler): ... # noqa: ERA001
        #
        if lambda_context is None:
            self._handler_accepts_lambda_context = False
        else:
            self.lambda_context = lambda_context
            self._handler_accepts_lambda_context = "lambda_context" in inspect.signature(self.handler).parameters

        return self

    def success_handler(self, record, result: Any) -> SuccessResponse:
        """
        Keeps track of batch records that were processed successfully

        Parameters
        ----------
        record: Any
            record that succeeded processing
        result: Any
            result from record handler

        Returns
        -------
        SuccessResponse
            "success", result, original record
        """
        entry = ("success", result, record)
        self.success_messages.append(record)
        return entry

    def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
        """
        Keeps track of batch records that failed processing

        Parameters
        ----------
        record: Any
            record that failed processing
        exception: ExceptionInfo
            Exception information containing type, value, and traceback (sys.exc_info())

        Returns
        -------
        FailureResponse
            "fail", exceptions args, original record
        """
        exception_string = f"{exception[0]}:{exception[1]}"
        entry = ("fail", exception_string, record)
        logger.debug(f"Record processing exception: {exception_string}")
        self.exceptions.append(exception)
        self.fail_messages.append(record)
        return entry

Ancestors

  • abc.ABC

Subclasses

Class variables

var lambda_contextLambdaContext

Methods

def async_process(self) ‑> List[Tuple]

Async call instance's handler for each record.

Note

We keep the outer function synchronous to prevent making Lambda handler async, so to not impact customers' existing middlewares. Instead, we create an async closure to handle asynchrony.

We also handle edge cases like Lambda container thaw by getting an existing or creating an event loop.

See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdown

Expand source code
def async_process(self) -> List[Tuple]:
    """
    Async call instance's handler for each record.

    Note
    ----

    We keep the outer function synchronous to prevent making Lambda handler async, so to not impact
    customers' existing middlewares. Instead, we create an async closure to handle asynchrony.

    We also handle edge cases like Lambda container thaw by getting an existing or creating an event loop.

    See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdown
    """

    async def async_process_closure():
        return list(await asyncio.gather(*[self._async_process_record(record) for record in self.records]))

    # WARNING
    # Do not use "asyncio.run(async_process())" due to Lambda container thaws/freeze, otherwise we might get "Event Loop is closed" # noqa: E501
    # Instead, get_event_loop() can also create one if a previous was erroneously closed
    # Mangum library does this as well. It's battle tested with other popular async-only frameworks like FastAPI
    # https://github.com/jordaneremieff/mangum/discussions/256#discussioncomment-2638946
    # https://github.com/jordaneremieff/mangum/blob/b85cd4a97f8ddd56094ccc540ca7156c76081745/mangum/protocols/http.py#L44

    # Let's prime the coroutine and decide
    # whether we create an event loop (Lambda) or schedule it as usual (non-Lambda)
    coro = async_process_closure()
    if os.getenv(constants.LAMBDA_TASK_ROOT_ENV):
        loop = asyncio.get_event_loop()  # NOTE: this might return an error starting in Python 3.12 in a few years
        task_instance = loop.create_task(coro)
        return loop.run_until_complete(task_instance)

    # Non-Lambda environment, run coroutine as usual
    return asyncio.run(coro)
def failure_handler(self, record, exception: Tuple[Optional[Type[BaseException]], Optional[BaseException], Optional[traceback]]) ‑> Tuple[str, str, Union[SQSRecordKinesisStreamRecordDynamoDBRecord, Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecord], ForwardRef(None)]]

Keeps track of batch records that failed processing

Parameters

record : Any
record that failed processing
exception : ExceptionInfo
Exception information containing type, value, and traceback (sys.exc_info())

Returns

FailureResponse
"fail", exceptions args, original record
Expand source code
def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
    """
    Keeps track of batch records that failed processing

    Parameters
    ----------
    record: Any
        record that failed processing
    exception: ExceptionInfo
        Exception information containing type, value, and traceback (sys.exc_info())

    Returns
    -------
    FailureResponse
        "fail", exceptions args, original record
    """
    exception_string = f"{exception[0]}:{exception[1]}"
    entry = ("fail", exception_string, record)
    logger.debug(f"Record processing exception: {exception_string}")
    self.exceptions.append(exception)
    self.fail_messages.append(record)
    return entry
def process(self) ‑> List[Tuple]

Call instance's handler for each record.

Expand source code
def process(self) -> List[Tuple]:
    """
    Call instance's handler for each record.
    """
    return [self._process_record(record) for record in self.records]
def success_handler(self, record, result: Any) ‑> Tuple[str, Any, Union[SQSRecordKinesisStreamRecordDynamoDBRecord, Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecord], ForwardRef(None)]]

Keeps track of batch records that were processed successfully

Parameters

record : Any
record that succeeded processing
result : Any
result from record handler

Returns

SuccessResponse
"success", result, original record
Expand source code
def success_handler(self, record, result: Any) -> SuccessResponse:
    """
    Keeps track of batch records that were processed successfully

    Parameters
    ----------
    record: Any
        record that succeeded processing
    result: Any
        result from record handler

    Returns
    -------
    SuccessResponse
        "success", result, original record
    """
    entry = ("success", result, record)
    self.success_messages.append(record)
    return entry
class BatchProcessor (event_type: EventType, model: Optional[ForwardRef('BatchTypeModels')] = None)

Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB.

Example

Process batch triggered by SQS

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.SQS)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: SQSRecord):
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch triggered by Kinesis Data Streams

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: KinesisStreamRecord):
    logger.info(record.kinesis.data_as_text)
    payload: dict = record.kinesis.data_as_json()
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch triggered by DynamoDB Data Streams

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: DynamoDBRecord):
    logger.info(record.dynamodb.new_image)
    payload: dict = json.loads(record.dynamodb.new_image.get("item"))
    # alternatively:
    # changes: Dict[str, Any] = record.dynamodb.new_image  # noqa: ERA001
    # payload = change.get("Message") -> "<payload>"
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    batch = event["Records"]
    with processor(records=batch, processor=processor):
        processed_messages = processor.process() # kick off processing, return list[tuple]

    return processor.response()

Raises

BatchProcessingError
When all batch records fail processing

Limitations

  • Async record handler not supported, use AsyncBatchProcessor instead.

Process batch and partially report failed items

Parameters

event_type : EventType
Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
model : Optional["BatchTypeModels"]
Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord

Exceptions

BatchProcessingError Raised when the entire batch has failed processing

Expand source code
class BatchProcessor(BasePartialBatchProcessor):  # Keep old name for compatibility
    """Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB.

    Example
    -------

    ## Process batch triggered by SQS

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = BatchProcessor(event_type=EventType.SQS)
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    def record_handler(record: SQSRecord):
        payload: str = record.body
        if payload:
            item: dict = json.loads(payload)
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    @batch_processor(record_handler=record_handler, processor=processor)
    def lambda_handler(event, context: LambdaContext):
        return processor.response()
    ```

    ## Process batch triggered by Kinesis Data Streams

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    def record_handler(record: KinesisStreamRecord):
        logger.info(record.kinesis.data_as_text)
        payload: dict = record.kinesis.data_as_json()
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    @batch_processor(record_handler=record_handler, processor=processor)
    def lambda_handler(event, context: LambdaContext):
        return processor.response()
    ```

    ## Process batch triggered by DynamoDB Data Streams

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    def record_handler(record: DynamoDBRecord):
        logger.info(record.dynamodb.new_image)
        payload: dict = json.loads(record.dynamodb.new_image.get("item"))
        # alternatively:
        # changes: Dict[str, Any] = record.dynamodb.new_image  # noqa: ERA001
        # payload = change.get("Message") -> "<payload>"
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    def lambda_handler(event, context: LambdaContext):
        batch = event["Records"]
        with processor(records=batch, processor=processor):
            processed_messages = processor.process() # kick off processing, return list[tuple]

        return processor.response()
    ```


    Raises
    ------
    BatchProcessingError
        When all batch records fail processing

    Limitations
    -----------
    * Async record handler not supported, use AsyncBatchProcessor instead.
    """

    async def _async_process_record(self, record: dict):
        raise NotImplementedError()

    def _process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
        """
        Process a record with instance's handler

        Parameters
        ----------
        record: dict
            A batch record to be processed.
        """
        data: Optional["BatchTypeModels"] = None
        try:
            data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
            if self._handler_accepts_lambda_context:
                result = self.handler(record=data, lambda_context=self.lambda_context)
            else:
                result = self.handler(record=data)

            return self.success_handler(record=record, result=result)
        except Exception as exc:
            # NOTE: Pydantic is an optional dependency, but when used and a poison pill scenario happens
            # we need to handle that exception differently.
            # We check for a public attr in validation errors coming from Pydantic exceptions (subclass or not)
            # and we compare if it's coming from the same model that trigger the exception in the first place

            # Pydantic v1 raises a ValidationError with ErrorWrappers and store the model instance in a class variable.
            # Pydantic v2 simplifies this by adding a title variable to store the model name directly.
            model = getattr(exc, "model", None) or getattr(exc, "title", None)
            model_name = getattr(self.model, "__name__", None)

            if model in (self.model, model_name):
                return self._register_model_validation_error_record(record)

            return self.failure_handler(record=data, exception=sys.exc_info())

Ancestors

Subclasses

Inherited members

class EventType (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code
class EventType(Enum):
    SQS = "SQS"
    KinesisDataStreams = "KinesisDataStreams"
    DynamoDBStreams = "DynamoDBStreams"

Ancestors

  • enum.Enum

Class variables

var DynamoDBStreams
var KinesisDataStreams
var SQS
class SqsFifoPartialProcessor (model: Optional[ForwardRef('BatchSqsTypeModel')] = None)

Process native partial responses from SQS FIFO queues.

Stops processing records when the first record fails. The remaining records are reported as failed items.

Example


Process batch triggered by a FIFO SQS

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = SqsFifoPartialProcessor()
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: SQSRecord):
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch and partially report failed items

Parameters

event_type : EventType
Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
model : Optional["BatchTypeModels"]
Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord

Exceptions

BatchProcessingError Raised when the entire batch has failed processing

Expand source code
class SqsFifoPartialProcessor(BatchProcessor):
    """Process native partial responses from SQS FIFO queues.

    Stops processing records when the first record fails. The remaining records are reported as failed items.

    Example
    _______

    ## Process batch triggered by a FIFO SQS

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = SqsFifoPartialProcessor()
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    def record_handler(record: SQSRecord):
        payload: str = record.body
        if payload:
            item: dict = json.loads(payload)
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    @batch_processor(record_handler=record_handler, processor=processor)
    def lambda_handler(event, context: LambdaContext):
        return processor.response()
    ```
    """

    circuit_breaker_exc = (
        SQSFifoCircuitBreakerError,
        SQSFifoCircuitBreakerError("A previous record failed processing"),
        None,
    )

    def __init__(self, model: Optional["BatchSqsTypeModel"] = None):
        super().__init__(EventType.SQS, model)

    def process(self) -> List[Tuple]:
        """
        Call instance's handler for each record. When the first failed message is detected,
        the process is short-circuited, and the remaining messages are reported as failed items.
        """
        result: List[Tuple] = []

        for i, record in enumerate(self.records):
            # If we have failed messages, it means that the last message failed.
            # We then short circuit the process, failing the remaining messages
            if self.fail_messages:
                return self._short_circuit_processing(i, result)

            # Otherwise, process the message normally
            result.append(self._process_record(record))

        return result

    def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]:
        """
        Starting from the first failure index, fail all the remaining messages, and append them to the result list.
        """
        remaining_records = self.records[first_failure_index:]
        for remaining_record in remaining_records:
            data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model)
            result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc))
        return result

    async def _async_process_record(self, record: dict):
        raise NotImplementedError()

Ancestors

Class variables

var circuit_breaker_exc

Methods

def process(self) ‑> List[Tuple]

Call instance's handler for each record. When the first failed message is detected, the process is short-circuited, and the remaining messages are reported as failed items.

Expand source code
def process(self) -> List[Tuple]:
    """
    Call instance's handler for each record. When the first failed message is detected,
    the process is short-circuited, and the remaining messages are reported as failed items.
    """
    result: List[Tuple] = []

    for i, record in enumerate(self.records):
        # If we have failed messages, it means that the last message failed.
        # We then short circuit the process, failing the remaining messages
        if self.fail_messages:
            return self._short_circuit_processing(i, result)

        # Otherwise, process the message normally
        result.append(self._process_record(record))

    return result

Inherited members