Module aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor
Classes
class SqsFifoPartialProcessor (model: BatchSqsTypeModel | None = None, skip_group_on_error: bool = False)
-
Expand source code
class SqsFifoPartialProcessor(BatchProcessor): """Process native partial responses from SQS FIFO queues. Stops processing records when the first record fails. The remaining records are reported as failed items. Example _______ ## Process batch triggered by a FIFO SQS ```python import json from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext processor = SqsFifoPartialProcessor() tracer = Tracer() logger = Logger() @tracer.capture_method def record_handler(record: SQSRecord): payload: str = record.body if payload: item: dict = json.loads(payload) ... @logger.inject_lambda_context @tracer.capture_lambda_handler @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): return processor.response() ``` """ circuit_breaker_exc = ( SQSFifoCircuitBreakerError, SQSFifoCircuitBreakerError("A previous record failed processing"), None, ) group_circuit_breaker_exc = ( SQSFifoMessageGroupCircuitBreakerError, SQSFifoMessageGroupCircuitBreakerError("A previous record from this message group failed processing"), None, ) def __init__(self, model: BatchSqsTypeModel | None = None, skip_group_on_error: bool = False): """ Initialize the SqsFifoProcessor. Parameters ---------- model: BatchSqsTypeModel | None An optional model for batch processing. skip_group_on_error: bool Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures Default is False. """ self._skip_group_on_error: bool = skip_group_on_error self._current_group_id = None self._failed_group_ids: set[str] = set() super().__init__(EventType.SQS, model) def _process_record(self, record): self._current_group_id = record.get("attributes", {}).get("MessageGroupId") # Short-circuits the process if: # - There are failed messages, OR # - The `skip_group_on_error` option is on, and the current message is part of a failed group. fail_entire_batch = bool(self.fail_messages) and not self._skip_group_on_error fail_group_id = self._skip_group_on_error and self._current_group_id in self._failed_group_ids if fail_entire_batch or fail_group_id: return self.failure_handler( record=self._to_batch_type(record, event_type=self.event_type, model=self.model), exception=self.group_circuit_breaker_exc if self._skip_group_on_error else self.circuit_breaker_exc, ) return super()._process_record(record) def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse: # If we are failing a message and the `skip_group_on_error` is on, we store the failed group ID # This way, future messages with the same group ID will be failed automatically. if self._skip_group_on_error and self._current_group_id: self._failed_group_ids.add(self._current_group_id) return super().failure_handler(record, exception) def _clean(self): self._failed_group_ids.clear() self._current_group_id = None super()._clean() async def _async_process_record(self, record: dict): raise NotImplementedError()
Process native partial responses from SQS FIFO queues.
Stops processing records when the first record fails. The remaining records are reported as failed items.
Example
Process batch triggered by a FIFO SQS
import json from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext processor = SqsFifoPartialProcessor() tracer = Tracer() logger = Logger() @tracer.capture_method def record_handler(record: SQSRecord): payload: str = record.body if payload: item: dict = json.loads(payload) ... @logger.inject_lambda_context @tracer.capture_lambda_handler @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): return processor.response()
Initialize the SqsFifoProcessor.
Parameters
model
:BatchSqsTypeModel | None
- An optional model for batch processing.
skip_group_on_error
:bool
- Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures Default is False.
Ancestors
Class variables
var circuit_breaker_exc
var group_circuit_breaker_exc
Inherited members