Module aws_lambda_powertools.utilities.parser.envelopes

Expand source code
from .base import BaseEnvelope
from .cloudwatch import CloudWatchLogsEnvelope
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
from .kinesis import KinesisDataStreamEnvelope
from .sns import SnsEnvelope, SnsSqsEnvelope
from .sqs import SqsEnvelope

__all__ = [
    "CloudWatchLogsEnvelope",
    "DynamoDBStreamEnvelope",
    "EventBridgeEnvelope",
    "KinesisDataStreamEnvelope",
    "SnsEnvelope",
    "SnsSqsEnvelope",
    "SqsEnvelope",
    "BaseEnvelope",
]

Sub-modules

aws_lambda_powertools.utilities.parser.envelopes.base
aws_lambda_powertools.utilities.parser.envelopes.cloudwatch
aws_lambda_powertools.utilities.parser.envelopes.dynamodb
aws_lambda_powertools.utilities.parser.envelopes.event_bridge
aws_lambda_powertools.utilities.parser.envelopes.kinesis
aws_lambda_powertools.utilities.parser.envelopes.sns
aws_lambda_powertools.utilities.parser.envelopes.sqs

Classes

class BaseEnvelope

ABC implementation for creating a supported Envelope

Expand source code
class BaseEnvelope(ABC):
    """ABC implementation for creating a supported Envelope"""

    @staticmethod
    def _parse(data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Union[Model, None]:
        """Parses envelope data against model provided

        Parameters
        ----------
        data : Dict
            Data to be parsed and validated
        model : Type[Model]
            Data model to parse and validate data against

        Returns
        -------
        Any
            Parsed data
        """
        if data is None:
            logger.debug("Skipping parsing as event is None")
            return data

        logger.debug("parsing event against model")
        if isinstance(data, str):
            logger.debug("parsing event as string")
            return model.parse_raw(data)

        return model.parse_obj(data)

    @abstractmethod
    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]):
        """Implementation to parse data against envelope model, then against the data model

        NOTE: Call `_parse` method to fully parse data with model provided.

        Example
        -------

        **EventBridge envelope implementation example**

        def parse(...):
            # 1. parses data against envelope model
            parsed_envelope = EventBridgeModel(**data)

            # 2. parses portion of data within the envelope against model
            return self._parse(data=parsed_envelope.detail, model=data_model)
        """
        return NotImplemented  # pragma: no cover

Ancestors

  • abc.ABC

Subclasses

Methods

def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model])

Implementation to parse data against envelope model, then against the data model

NOTE: Call _parse method to fully parse data with model provided.

Example

EventBridge envelope implementation example

def parse(…): # 1. parses data against envelope model parsed_envelope = EventBridgeModel(**data)

# 2. parses portion of data within the envelope against model
return self._parse(data=parsed_envelope.detail, model=data_model)
Expand source code
@abstractmethod
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]):
    """Implementation to parse data against envelope model, then against the data model

    NOTE: Call `_parse` method to fully parse data with model provided.

    Example
    -------

    **EventBridge envelope implementation example**

    def parse(...):
        # 1. parses data against envelope model
        parsed_envelope = EventBridgeModel(**data)

        # 2. parses portion of data within the envelope against model
        return self._parse(data=parsed_envelope.detail, model=data_model)
    """
    return NotImplemented  # pragma: no cover
class CloudWatchLogsEnvelope

CloudWatch Envelope to extract a List of log records.

The record's body parameter is a string (after being base64 decoded and gzipped), though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.

Note: The record will be parsed the same way so if model is str

Expand source code
class CloudWatchLogsEnvelope(BaseEnvelope):
    """CloudWatch Envelope to extract a List of log records.

    The record's body parameter is a string (after being base64 decoded and gzipped),
    though it can also be a JSON encoded string.
    Regardless of its type it'll be parsed into a BaseModel object.

    Note: The record will be parsed the same way so if model is str
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}")
        parsed_envelope = CloudWatchLogsModel.parse_obj(data)
        logger.debug(f"Parsing CloudWatch records in `body` with {model}")
        return [
            self._parse(data=record.message, model=model) for record in parsed_envelope.awslogs.decoded_data.logEvents
        ]

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Union[~Model, NoneType]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}")
    parsed_envelope = CloudWatchLogsModel.parse_obj(data)
    logger.debug(f"Parsing CloudWatch records in `body` with {model}")
    return [
        self._parse(data=record.message, model=model) for record in parsed_envelope.awslogs.decoded_data.logEvents
    ]
class DynamoDBStreamEnvelope

DynamoDB Stream Envelope to extract data within NewImage/OldImage

Note: Values are the parsed models. Images' values can also be None, and length of the list is the record's amount in the original event.

Expand source code
class DynamoDBStreamEnvelope(BaseEnvelope):
    """DynamoDB Stream Envelope to extract data within NewImage/OldImage

    Note: Values are the parsed models. Images' values can also be None, and
    length of the list is the record's amount in the original event.
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Dict[str, Optional[Model]]]:
        """Parses DynamoDB Stream records found in either NewImage and OldImage with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of dictionaries with NewImage and OldImage records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with DynamoDB Stream model {DynamoDBStreamModel}")
        parsed_envelope = DynamoDBStreamModel.parse_obj(data)
        logger.debug(f"Parsing DynamoDB Stream new and old records with {model}")
        return [
            {
                "NewImage": self._parse(data=record.dynamodb.NewImage, model=model),
                "OldImage": self._parse(data=record.dynamodb.OldImage, model=model),
            }
            for record in parsed_envelope.Records
        ]

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Dict[str, Union[~Model, NoneType]]]

Parses DynamoDB Stream records found in either NewImage and OldImage with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of dictionaries with NewImage and OldImage records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Dict[str, Optional[Model]]]:
    """Parses DynamoDB Stream records found in either NewImage and OldImage with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of dictionaries with NewImage and OldImage records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with DynamoDB Stream model {DynamoDBStreamModel}")
    parsed_envelope = DynamoDBStreamModel.parse_obj(data)
    logger.debug(f"Parsing DynamoDB Stream new and old records with {model}")
    return [
        {
            "NewImage": self._parse(data=record.dynamodb.NewImage, model=model),
            "OldImage": self._parse(data=record.dynamodb.OldImage, model=model),
        }
        for record in parsed_envelope.Records
    ]
class EventBridgeEnvelope

EventBridge envelope to extract data within detail key

Expand source code
class EventBridgeEnvelope(BaseEnvelope):
    """EventBridge envelope to extract data within detail key"""

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
        """Parses data found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        Any
            Parsed detail payload with model provided
        """
        logger.debug(f"Parsing incoming data with EventBridge model {EventBridgeModel}")
        parsed_envelope = EventBridgeModel.parse_obj(data)
        logger.debug(f"Parsing event payload in `detail` with {model}")
        return self._parse(data=parsed_envelope.detail, model=model)

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> Union[~Model, NoneType]

Parses data found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

Any
Parsed detail payload with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
    """Parses data found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    Any
        Parsed detail payload with model provided
    """
    logger.debug(f"Parsing incoming data with EventBridge model {EventBridgeModel}")
    parsed_envelope = EventBridgeModel.parse_obj(data)
    logger.debug(f"Parsing event payload in `detail` with {model}")
    return self._parse(data=parsed_envelope.detail, model=model)
class KinesisDataStreamEnvelope

Kinesis Data Stream Envelope to extract array of Records

The record's data parameter is a base64 encoded string which is parsed into a bytes array, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa)

Expand source code
class KinesisDataStreamEnvelope(BaseEnvelope):
    """Kinesis Data Stream Envelope to extract array of Records

    The record's data parameter is a base64 encoded string which is parsed into a bytes array,
    though it can also be a JSON encoded string.
    Regardless of its type it'll be parsed into a BaseModel object.

    Note: Records will be parsed the same way so if model is str,
    all items in the list will be parsed as str and npt as JSON (and vice versa)
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}")
        parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data)
        logger.debug(f"Parsing Kinesis records in `body` with {model}")
        return [
            self._parse(data=record.kinesis.data.decode("utf-8"), model=model) for record in parsed_envelope.Records
        ]

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Union[~Model, NoneType]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}")
    parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data)
    logger.debug(f"Parsing Kinesis records in `body` with {model}")
    return [
        self._parse(data=record.kinesis.data.decode("utf-8"), model=model) for record in parsed_envelope.Records
    ]
class SnsEnvelope

SNS Envelope to extract array of Records

The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa)

Expand source code
class SnsEnvelope(BaseEnvelope):
    """SNS Envelope to extract array of Records

    The record's body parameter is a string, though it can also be a JSON encoded string.
    Regardless of its type it'll be parsed into a BaseModel object.

    Note: Records will be parsed the same way so if model is str,
    all items in the list will be parsed as str and npt as JSON (and vice versa)
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with SNS model {SnsModel}")
        parsed_envelope = SnsModel.parse_obj(data)
        logger.debug(f"Parsing SNS records in `body` with {model}")
        return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Union[~Model, NoneType]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with SNS model {SnsModel}")
    parsed_envelope = SnsModel.parse_obj(data)
    logger.debug(f"Parsing SNS records in `body` with {model}")
    return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]
class SnsSqsEnvelope

SNS plus SQS Envelope to extract array of Records

Published messages from SNS to SQS has a slightly different payload. Since SNS payload is marshalled into Record key in SQS, we have to:

  1. Parse SQS schema with incoming data
  2. Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record
  3. Finally, parse provided model against payload extracted
Expand source code
class SnsSqsEnvelope(BaseEnvelope):
    """SNS plus SQS Envelope to extract array of Records

    Published messages from SNS to SQS has a slightly different payload.
    Since SNS payload is marshalled into `Record` key in SQS, we have to:

    1. Parse SQS schema with incoming data
    2. Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record
    3. Finally, parse provided model against payload extracted
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
        parsed_envelope = SqsModel.parse_obj(data)
        output = []
        for record in parsed_envelope.Records:
            sns_notification = SnsNotificationModel.parse_raw(record.body)
            output.append(self._parse(data=sns_notification.Message, model=model))
        return output

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Union[~Model, NoneType]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
    parsed_envelope = SqsModel.parse_obj(data)
    output = []
    for record in parsed_envelope.Records:
        sns_notification = SnsNotificationModel.parse_raw(record.body)
        output.append(self._parse(data=sns_notification.Message, model=model))
    return output
class SqsEnvelope

SQS Envelope to extract array of Records

The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa)

Expand source code
class SqsEnvelope(BaseEnvelope):
    """SQS Envelope to extract array of Records

    The record's body parameter is a string, though it can also be a JSON encoded string.
    Regardless of its type it'll be parsed into a BaseModel object.

    Note: Records will be parsed the same way so if model is str,
    all items in the list will be parsed as str and npt as JSON (and vice versa)
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
        parsed_envelope = SqsModel.parse_obj(data)
        logger.debug(f"Parsing SQS records in `body` with {model}")
        return [self._parse(data=record.body, model=model) for record in parsed_envelope.Records]

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Union[~Model, NoneType]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
    parsed_envelope = SqsModel.parse_obj(data)
    logger.debug(f"Parsing SQS records in `body` with {model}")
    return [self._parse(data=record.body, model=model) for record in parsed_envelope.Records]