Module aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor
Expand source code
from typing import List, Optional, Tuple
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel
class SQSFifoCircuitBreakerError(Exception):
"""
Signals a record not processed due to the SQS FIFO processing being interrupted
"""
pass
class SqsFifoPartialProcessor(BatchProcessor):
"""Process native partial responses from SQS FIFO queues.
Stops processing records when the first record fails. The remaining records are reported as failed items.
Example
_______
## Process batch triggered by a FIFO SQS
```python
import json
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = SqsFifoPartialProcessor()
tracer = Tracer()
logger = Logger()
@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.body
if payload:
item: dict = json.loads(payload)
...
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
```
"""
circuit_breaker_exc = (
SQSFifoCircuitBreakerError,
SQSFifoCircuitBreakerError("A previous record failed processing"),
None,
)
def __init__(self, model: Optional["BatchSqsTypeModel"] = None):
super().__init__(EventType.SQS, model)
def process(self) -> List[Tuple]:
"""
Call instance's handler for each record. When the first failed message is detected,
the process is short-circuited, and the remaining messages are reported as failed items.
"""
result: List[Tuple] = []
for i, record in enumerate(self.records):
# If we have failed messages, it means that the last message failed.
# We then short circuit the process, failing the remaining messages
if self.fail_messages:
return self._short_circuit_processing(i, result)
# Otherwise, process the message normally
result.append(self._process_record(record))
return result
def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]:
"""
Starting from the first failure index, fail all the remaining messages, and append them to the result list.
"""
remaining_records = self.records[first_failure_index:]
for remaining_record in remaining_records:
data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model)
result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc))
return result
async def _async_process_record(self, record: dict):
raise NotImplementedError()
Classes
class SQSFifoCircuitBreakerError (*args, **kwargs)
-
Signals a record not processed due to the SQS FIFO processing being interrupted
Expand source code
class SQSFifoCircuitBreakerError(Exception): """ Signals a record not processed due to the SQS FIFO processing being interrupted """ pass
Ancestors
- builtins.Exception
- builtins.BaseException
class SqsFifoPartialProcessor (model: Optional[ForwardRef('BatchSqsTypeModel')] = None)
-
Process native partial responses from SQS FIFO queues.
Stops processing records when the first record fails. The remaining records are reported as failed items.
Example
Process batch triggered by a FIFO SQS
import json from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext processor = SqsFifoPartialProcessor() tracer = Tracer() logger = Logger() @tracer.capture_method def record_handler(record: SQSRecord): payload: str = record.body if payload: item: dict = json.loads(payload) ... @logger.inject_lambda_context @tracer.capture_lambda_handler @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): return processor.response()
Process batch and partially report failed items
Parameters
event_type
:EventType
- Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
model
:Optional["BatchTypeModels"]
- Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord
Exceptions
BatchProcessingError Raised when the entire batch has failed processing
Expand source code
class SqsFifoPartialProcessor(BatchProcessor): """Process native partial responses from SQS FIFO queues. Stops processing records when the first record fails. The remaining records are reported as failed items. Example _______ ## Process batch triggered by a FIFO SQS ```python import json from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext processor = SqsFifoPartialProcessor() tracer = Tracer() logger = Logger() @tracer.capture_method def record_handler(record: SQSRecord): payload: str = record.body if payload: item: dict = json.loads(payload) ... @logger.inject_lambda_context @tracer.capture_lambda_handler @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): return processor.response() ``` """ circuit_breaker_exc = ( SQSFifoCircuitBreakerError, SQSFifoCircuitBreakerError("A previous record failed processing"), None, ) def __init__(self, model: Optional["BatchSqsTypeModel"] = None): super().__init__(EventType.SQS, model) def process(self) -> List[Tuple]: """ Call instance's handler for each record. When the first failed message is detected, the process is short-circuited, and the remaining messages are reported as failed items. """ result: List[Tuple] = [] for i, record in enumerate(self.records): # If we have failed messages, it means that the last message failed. # We then short circuit the process, failing the remaining messages if self.fail_messages: return self._short_circuit_processing(i, result) # Otherwise, process the message normally result.append(self._process_record(record)) return result def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]: """ Starting from the first failure index, fail all the remaining messages, and append them to the result list. """ remaining_records = self.records[first_failure_index:] for remaining_record in remaining_records: data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model) result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc)) return result async def _async_process_record(self, record: dict): raise NotImplementedError()
Ancestors
Class variables
var circuit_breaker_exc
Methods
def process(self) ‑> List[Tuple]
-
Call instance's handler for each record. When the first failed message is detected, the process is short-circuited, and the remaining messages are reported as failed items.
Expand source code
def process(self) -> List[Tuple]: """ Call instance's handler for each record. When the first failed message is detected, the process is short-circuited, and the remaining messages are reported as failed items. """ result: List[Tuple] = [] for i, record in enumerate(self.records): # If we have failed messages, it means that the last message failed. # We then short circuit the process, failing the remaining messages if self.fail_messages: return self._short_circuit_processing(i, result) # Otherwise, process the message normally result.append(self._process_record(record)) return result
Inherited members