Module aws_lambda_powertools.utilities.batch
Batch processing utility
Expand source code
# -*- coding: utf-8 -*-
"""
Batch processing utility
"""
from .base import BasePartialProcessor, batch_processor
from .sqs import PartialSQSProcessor, sqs_batch_processor
__all__ = ("BasePartialProcessor", "PartialSQSProcessor", "batch_processor", "sqs_batch_processor")
Sub-modules
aws_lambda_powertools.utilities.batch.base
-
Batch processing utilities
aws_lambda_powertools.utilities.batch.exceptions
-
Batch processing exceptions
aws_lambda_powertools.utilities.batch.sqs
-
Batch SQS utilities
Functions
def batch_processor(handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor)
-
Middleware to handle batch event processing
Parameters
handler
:Callable
- Lambda's handler
event
:Dict
- Lambda's Event
context
:Dict
- Lambda's Context
record_handler
:Callable
- Callable to process each record from the batch
processor
:PartialSQSProcessor
- Batch Processor to handle partial failure cases
Examples
Processes Lambda's event with PartialSQSProcessor
>>> from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) >>> def handler(event, context): >>> return {"StatusCode": 200}
Limitations
- Async batch processors
Expand source code
@lambda_handler_decorator def batch_processor( handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor ): """ Middleware to handle batch event processing Parameters ---------- handler: Callable Lambda's handler event: Dict Lambda's Event context: Dict Lambda's Context record_handler: Callable Callable to process each record from the batch processor: PartialSQSProcessor Batch Processor to handle partial failure cases Examples -------- **Processes Lambda's event with PartialSQSProcessor** >>> from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) >>> def handler(event, context): >>> return {"StatusCode": 200} Limitations ----------- * Async batch processors """ records = event["Records"] with processor(records, record_handler): processor.process() return handler(event, context)
def sqs_batch_processor(handler: Callable, event: Dict, context: Dict, record_handler: Callable, config: Union[botocore.config.Config, NoneType] = None, suppress_exception: bool = False)
-
Middleware to handle SQS batch event processing
Parameters
handler
:Callable
- Lambda's handler
event
:Dict
- Lambda's Event
context
:Dict
- Lambda's Context
record_handler
:Callable
- Callable to process each record from the batch
config
:Config
- botocore config object
suppress_exception
:bool
, optional- Supress exception raised if any messages fail processing, by default False
Examples
Processes Lambda's event with PartialSQSProcessor
>>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> @sqs_batch_processor(record_handler=record_handler) >>> def handler(event, context): >>> return {"StatusCode": 200}
Limitations
- Async batch processors
Expand source code
@lambda_handler_decorator def sqs_batch_processor( handler: Callable, event: Dict, context: Dict, record_handler: Callable, config: Optional[Config] = None, suppress_exception: bool = False, ): """ Middleware to handle SQS batch event processing Parameters ---------- handler: Callable Lambda's handler event: Dict Lambda's Event context: Dict Lambda's Context record_handler: Callable Callable to process each record from the batch config: Config botocore config object suppress_exception: bool, optional Supress exception raised if any messages fail processing, by default False Examples -------- **Processes Lambda's event with PartialSQSProcessor** >>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> @sqs_batch_processor(record_handler=record_handler) >>> def handler(event, context): >>> return {"StatusCode": 200} Limitations ----------- * Async batch processors """ config = config or Config() processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception) records = event["Records"] with processor(records, record_handler): processor.process() return handler(event, context)
Classes
class BasePartialProcessor
-
Abstract class for batch processors.
Expand source code
class BasePartialProcessor(ABC): """ Abstract class for batch processors. """ def __init__(self): self.success_messages: List = [] self.fail_messages: List = [] self.exceptions: List = [] @abstractmethod def _prepare(self): """ Prepare context manager. """ raise NotImplementedError() @abstractmethod def _clean(self): """ Clear context manager. """ raise NotImplementedError() @abstractmethod def _process_record(self, record: Any): """ Process record with handler. """ raise NotImplementedError() def process(self) -> List[Tuple]: """ Call instance's handler for each record. """ return [self._process_record(record) for record in self.records] def __enter__(self): self._prepare() return self def __exit__(self, exception_type, exception_value, traceback): self._clean() def __call__(self, records: List[Any], handler: Callable): """ Set instance attributes before execution Parameters ---------- records: List[Any] List with objects to be processed. handler: Callable Callable to process "records" entries. """ self.records = records self.handler = handler return self def success_handler(self, record: Any, result: Any): """ Success callback Returns ------- tuple "success", result, original record """ entry = ("success", result, record) self.success_messages.append(record) return entry def failure_handler(self, record: Any, exception: Tuple): """ Failure callback Returns ------- tuple "fail", exceptions args, original record """ exception_string = f"{exception[0]}:{exception[1]}" entry = ("fail", exception_string, record) logger.debug(f"Record processing exception: {exception_string}") self.exceptions.append(exception) self.fail_messages.append(record) return entry
Ancestors
- abc.ABC
Subclasses
Methods
def failure_handler(self, record: Any, exception: Tuple)
-
Failure callback
Returns
tuple
- "fail", exceptions args, original record
Expand source code
def failure_handler(self, record: Any, exception: Tuple): """ Failure callback Returns ------- tuple "fail", exceptions args, original record """ exception_string = f"{exception[0]}:{exception[1]}" entry = ("fail", exception_string, record) logger.debug(f"Record processing exception: {exception_string}") self.exceptions.append(exception) self.fail_messages.append(record) return entry
def process(self) ‑> List[Tuple]
-
Call instance's handler for each record.
Expand source code
def process(self) -> List[Tuple]: """ Call instance's handler for each record. """ return [self._process_record(record) for record in self.records]
def success_handler(self, record: Any, result: Any)
-
Success callback
Returns
tuple
- "success", result, original record
Expand source code
def success_handler(self, record: Any, result: Any): """ Success callback Returns ------- tuple "success", result, original record """ entry = ("success", result, record) self.success_messages.append(record) return entry
class PartialSQSProcessor (config: Union[botocore.config.Config, NoneType] = None, suppress_exception: bool = False)
-
Amazon SQS batch processor to delete successes from the Queue.
The whole batch will be processed, even if failures occur. After all records are processed, SQSBatchProcessingError will be raised if there were any failures, causing messages to be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception.
Parameters
config
:Config
- botocore config object
suppress_exception
:bool
, optional- Supress exception raised if any messages fail processing, by default False
Example
Process batch triggered by SQS
>>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> def handler(event, context): >>> records = event["Records"] >>> processor = PartialSQSProcessor() >>> >>> with processor(records=records, handler=record_handler): >>> result = processor.process() >>> >>> # Case a partial failure occurred, all successful executions >>> # have been deleted from the queue after context's exit. >>> >>> return result
Initializes sqs client.
Expand source code
class PartialSQSProcessor(BasePartialProcessor): """ Amazon SQS batch processor to delete successes from the Queue. The whole batch will be processed, even if failures occur. After all records are processed, SQSBatchProcessingError will be raised if there were any failures, causing messages to be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception. Parameters ---------- config: Config botocore config object suppress_exception: bool, optional Supress exception raised if any messages fail processing, by default False Example ------- **Process batch triggered by SQS** >>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> def handler(event, context): >>> records = event["Records"] >>> processor = PartialSQSProcessor() >>> >>> with processor(records=records, handler=record_handler): >>> result = processor.process() >>> >>> # Case a partial failure occurred, all successful executions >>> # have been deleted from the queue after context's exit. >>> >>> return result """ def __init__(self, config: Optional[Config] = None, suppress_exception: bool = False): """ Initializes sqs client. """ config = config or Config() self.client = boto3.client("sqs", config=config) self.suppress_exception = suppress_exception super().__init__() def _get_queue_url(self) -> Optional[str]: """ Format QueueUrl from first records entry """ if not getattr(self, "records", None): return None *_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":") return f"{self.client._endpoint.host}/{account_id}/{queue_name}" def _get_entries_to_clean(self) -> List: """ Format messages to use in batch deletion """ return [{"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]} for msg in self.success_messages] def _process_record(self, record) -> Tuple: """ Process a record with instance's handler Parameters ---------- record: Any An object to be processed. """ try: result = self.handler(record=record) return self.success_handler(record=record, result=result) except Exception: return self.failure_handler(record=record, exception=sys.exc_info()) def _prepare(self): """ Remove results from previous execution. """ self.success_messages.clear() self.fail_messages.clear() def _clean(self): """ Delete messages from Queue in case of partial failure. """ # If all messages were successful, fall back to the default SQS - # Lambda behaviour which deletes messages if Lambda responds successfully if not self.fail_messages: logger.debug(f"All {len(self.success_messages)} records successfully processed") return queue_url = self._get_queue_url() entries_to_remove = self._get_entries_to_clean() delete_message_response = None if entries_to_remove: delete_message_response = self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) if self.suppress_exception: logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed") else: logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception") raise SQSBatchProcessingError( msg=f"Not all records processed successfully. {len(self.exceptions)} individual errors logged " f"separately below.", child_exceptions=self.exceptions, ) return delete_message_response
Ancestors
- BasePartialProcessor
- abc.ABC
Inherited members