Module aws_lambda_powertools.utilities.batch.base

Batch processing utilities

Expand source code
# -*- coding: utf-8 -*-

"""
Batch processing utilities
"""

import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Tuple

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator

logger = logging.getLogger(__name__)


class BasePartialProcessor(ABC):
    """
    Abstract class for batch processors.
    """

    def __init__(self):
        self.success_messages: List = []
        self.fail_messages: List = []
        self.exceptions: List = []

    @abstractmethod
    def _prepare(self):
        """
        Prepare context manager.
        """
        raise NotImplementedError()

    @abstractmethod
    def _clean(self):
        """
        Clear context manager.
        """
        raise NotImplementedError()

    @abstractmethod
    def _process_record(self, record: Any):
        """
        Process record with handler.
        """
        raise NotImplementedError()

    def process(self) -> List[Tuple]:
        """
        Call instance's handler for each record.
        """
        return [self._process_record(record) for record in self.records]

    def __enter__(self):
        self._prepare()
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        self._clean()

    def __call__(self, records: List[Any], handler: Callable):
        """
        Set instance attributes before execution

        Parameters
        ----------
        records: List[Any]
            List with objects to be processed.
        handler: Callable
            Callable to process "records" entries.
        """
        self.records = records
        self.handler = handler
        return self

    def success_handler(self, record: Any, result: Any):
        """
        Success callback

        Returns
        -------
        tuple
            "success", result, original record
        """
        entry = ("success", result, record)
        self.success_messages.append(record)
        return entry

    def failure_handler(self, record: Any, exception: Tuple):
        """
        Failure callback

        Returns
        -------
        tuple
            "fail", exceptions args, original record
        """
        exception_string = f"{exception[0]}:{exception[1]}"
        entry = ("fail", exception_string, record)
        logger.debug(f"Record processing exception: {exception_string}")
        self.exceptions.append(exception)
        self.fail_messages.append(record)
        return entry


@lambda_handler_decorator
def batch_processor(
    handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor
):
    """
    Middleware to handle batch event processing

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: Dict
        Lambda's Event
    context: Dict
        Lambda's Context
    record_handler: Callable
        Callable to process each record from the batch
    processor: PartialSQSProcessor
        Batch Processor to handle partial failure cases

    Examples
    --------
    **Processes Lambda's event with PartialSQSProcessor**

        >>> from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
        >>>
        >>> def record_handler(record):
        >>>     return record["body"]
        >>>
        >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
        >>> def handler(event, context):
        >>>     return {"StatusCode": 200}

    Limitations
    -----------
    * Async batch processors

    """
    records = event["Records"]

    with processor(records, record_handler):
        processor.process()

    return handler(event, context)

Functions

def batch_processor(handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor)

Middleware to handle batch event processing

Parameters

handler : Callable
Lambda's handler
event : Dict
Lambda's Event
context : Dict
Lambda's Context
record_handler : Callable
Callable to process each record from the batch
processor : PartialSQSProcessor
Batch Processor to handle partial failure cases

Examples

Processes Lambda's event with PartialSQSProcessor

>>> from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
>>>
>>> def record_handler(record):
>>>     return record["body"]
>>>
>>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
>>> def handler(event, context):
>>>     return {"StatusCode": 200}

Limitations

  • Async batch processors
Expand source code
@lambda_handler_decorator
def batch_processor(
    handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor
):
    """
    Middleware to handle batch event processing

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: Dict
        Lambda's Event
    context: Dict
        Lambda's Context
    record_handler: Callable
        Callable to process each record from the batch
    processor: PartialSQSProcessor
        Batch Processor to handle partial failure cases

    Examples
    --------
    **Processes Lambda's event with PartialSQSProcessor**

        >>> from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
        >>>
        >>> def record_handler(record):
        >>>     return record["body"]
        >>>
        >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
        >>> def handler(event, context):
        >>>     return {"StatusCode": 200}

    Limitations
    -----------
    * Async batch processors

    """
    records = event["Records"]

    with processor(records, record_handler):
        processor.process()

    return handler(event, context)

Classes

class BasePartialProcessor

Abstract class for batch processors.

Expand source code
class BasePartialProcessor(ABC):
    """
    Abstract class for batch processors.
    """

    def __init__(self):
        self.success_messages: List = []
        self.fail_messages: List = []
        self.exceptions: List = []

    @abstractmethod
    def _prepare(self):
        """
        Prepare context manager.
        """
        raise NotImplementedError()

    @abstractmethod
    def _clean(self):
        """
        Clear context manager.
        """
        raise NotImplementedError()

    @abstractmethod
    def _process_record(self, record: Any):
        """
        Process record with handler.
        """
        raise NotImplementedError()

    def process(self) -> List[Tuple]:
        """
        Call instance's handler for each record.
        """
        return [self._process_record(record) for record in self.records]

    def __enter__(self):
        self._prepare()
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        self._clean()

    def __call__(self, records: List[Any], handler: Callable):
        """
        Set instance attributes before execution

        Parameters
        ----------
        records: List[Any]
            List with objects to be processed.
        handler: Callable
            Callable to process "records" entries.
        """
        self.records = records
        self.handler = handler
        return self

    def success_handler(self, record: Any, result: Any):
        """
        Success callback

        Returns
        -------
        tuple
            "success", result, original record
        """
        entry = ("success", result, record)
        self.success_messages.append(record)
        return entry

    def failure_handler(self, record: Any, exception: Tuple):
        """
        Failure callback

        Returns
        -------
        tuple
            "fail", exceptions args, original record
        """
        exception_string = f"{exception[0]}:{exception[1]}"
        entry = ("fail", exception_string, record)
        logger.debug(f"Record processing exception: {exception_string}")
        self.exceptions.append(exception)
        self.fail_messages.append(record)
        return entry

Ancestors

  • abc.ABC

Subclasses

Methods

def failure_handler(self, record: Any, exception: Tuple)

Failure callback

Returns

tuple
"fail", exceptions args, original record
Expand source code
def failure_handler(self, record: Any, exception: Tuple):
    """
    Failure callback

    Returns
    -------
    tuple
        "fail", exceptions args, original record
    """
    exception_string = f"{exception[0]}:{exception[1]}"
    entry = ("fail", exception_string, record)
    logger.debug(f"Record processing exception: {exception_string}")
    self.exceptions.append(exception)
    self.fail_messages.append(record)
    return entry
def process(self) ‑> List[Tuple]

Call instance's handler for each record.

Expand source code
def process(self) -> List[Tuple]:
    """
    Call instance's handler for each record.
    """
    return [self._process_record(record) for record in self.records]
def success_handler(self, record: Any, result: Any)

Success callback

Returns

tuple
"success", result, original record
Expand source code
def success_handler(self, record: Any, result: Any):
    """
    Success callback

    Returns
    -------
    tuple
        "success", result, original record
    """
    entry = ("success", result, record)
    self.success_messages.append(record)
    return entry