Module aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor

Classes

class SqsFifoPartialProcessor (model: BatchSqsTypeModel | None = None, skip_group_on_error: bool = False)

Process native partial responses from SQS FIFO queues.

Stops processing records when the first record fails. The remaining records are reported as failed items.

Example


Process batch triggered by a FIFO SQS

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = SqsFifoPartialProcessor()
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: SQSRecord):
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Initialize the SqsFifoProcessor.

Parameters

model : BatchSqsTypeModel | None
An optional model for batch processing.
skip_group_on_error : bool
Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures Default is False.
Expand source code
class SqsFifoPartialProcessor(BatchProcessor):
    """Process native partial responses from SQS FIFO queues.

    Stops processing records when the first record fails. The remaining records are reported as failed items.

    Example
    _______

    ## Process batch triggered by a FIFO SQS

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = SqsFifoPartialProcessor()
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    def record_handler(record: SQSRecord):
        payload: str = record.body
        if payload:
            item: dict = json.loads(payload)
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    @batch_processor(record_handler=record_handler, processor=processor)
    def lambda_handler(event, context: LambdaContext):
        return processor.response()
    ```
    """

    circuit_breaker_exc = (
        SQSFifoCircuitBreakerError,
        SQSFifoCircuitBreakerError("A previous record failed processing"),
        None,
    )

    group_circuit_breaker_exc = (
        SQSFifoMessageGroupCircuitBreakerError,
        SQSFifoMessageGroupCircuitBreakerError("A previous record from this message group failed processing"),
        None,
    )

    def __init__(self, model: BatchSqsTypeModel | None = None, skip_group_on_error: bool = False):
        """
        Initialize the SqsFifoProcessor.

        Parameters
        ----------
        model: BatchSqsTypeModel | None
            An optional model for batch processing.
        skip_group_on_error: bool
            Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures
            Default is False.

        """
        self._skip_group_on_error: bool = skip_group_on_error
        self._current_group_id = None
        self._failed_group_ids: set[str] = set()
        super().__init__(EventType.SQS, model)

    def _process_record(self, record):
        self._current_group_id = record.get("attributes", {}).get("MessageGroupId")

        # Short-circuits the process if:
        #     - There are failed messages, OR
        #     - The `skip_group_on_error` option is on, and the current message is part of a failed group.
        fail_entire_batch = bool(self.fail_messages) and not self._skip_group_on_error
        fail_group_id = self._skip_group_on_error and self._current_group_id in self._failed_group_ids
        if fail_entire_batch or fail_group_id:
            return self.failure_handler(
                record=self._to_batch_type(record, event_type=self.event_type, model=self.model),
                exception=self.group_circuit_breaker_exc if self._skip_group_on_error else self.circuit_breaker_exc,
            )

        return super()._process_record(record)

    def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
        # If we are failing a message and the `skip_group_on_error` is on, we store the failed group ID
        # This way, future messages with the same group ID will be failed automatically.
        if self._skip_group_on_error and self._current_group_id:
            self._failed_group_ids.add(self._current_group_id)

        return super().failure_handler(record, exception)

    def _clean(self):
        self._failed_group_ids.clear()
        self._current_group_id = None

        super()._clean()

    async def _async_process_record(self, record: dict):
        raise NotImplementedError()

Ancestors

Class variables

var circuit_breaker_exc
var group_circuit_breaker_exc

Inherited members