Module aws_lambda_powertools.utilities.parser.envelopes

Expand source code
from .apigw import ApiGatewayEnvelope
from .apigwv2 import ApiGatewayV2Envelope
from .base import BaseEnvelope
from .bedrock_agent import BedrockAgentEnvelope
from .cloudwatch import CloudWatchLogsEnvelope
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
from .kafka import KafkaEnvelope
from .kinesis import KinesisDataStreamEnvelope
from .kinesis_firehose import KinesisFirehoseEnvelope
from .lambda_function_url import LambdaFunctionUrlEnvelope
from .sns import SnsEnvelope, SnsSqsEnvelope
from .sqs import SqsEnvelope
from .vpc_lattice import VpcLatticeEnvelope
from .vpc_latticev2 import VpcLatticeV2Envelope

__all__ = [
    "ApiGatewayEnvelope",
    "ApiGatewayV2Envelope",
    "BedrockAgentEnvelope",
    "CloudWatchLogsEnvelope",
    "DynamoDBStreamEnvelope",
    "EventBridgeEnvelope",
    "KinesisDataStreamEnvelope",
    "KinesisFirehoseEnvelope",
    "LambdaFunctionUrlEnvelope",
    "SnsEnvelope",
    "SnsSqsEnvelope",
    "SqsEnvelope",
    "KafkaEnvelope",
    "BaseEnvelope",
    "VpcLatticeEnvelope",
    "VpcLatticeV2Envelope",
]

Sub-modules

aws_lambda_powertools.utilities.parser.envelopes.apigw
aws_lambda_powertools.utilities.parser.envelopes.apigwv2
aws_lambda_powertools.utilities.parser.envelopes.base
aws_lambda_powertools.utilities.parser.envelopes.bedrock_agent
aws_lambda_powertools.utilities.parser.envelopes.cloudwatch
aws_lambda_powertools.utilities.parser.envelopes.dynamodb
aws_lambda_powertools.utilities.parser.envelopes.event_bridge
aws_lambda_powertools.utilities.parser.envelopes.kafka
aws_lambda_powertools.utilities.parser.envelopes.kinesis
aws_lambda_powertools.utilities.parser.envelopes.kinesis_firehose
aws_lambda_powertools.utilities.parser.envelopes.lambda_function_url
aws_lambda_powertools.utilities.parser.envelopes.sns
aws_lambda_powertools.utilities.parser.envelopes.sqs
aws_lambda_powertools.utilities.parser.envelopes.vpc_lattice
aws_lambda_powertools.utilities.parser.envelopes.vpc_latticev2

Classes

class ApiGatewayEnvelope

API Gateway envelope to extract data within body key

Expand source code
class ApiGatewayEnvelope(BaseEnvelope):
    """API Gateway envelope to extract data within body key"""

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
        """Parses data found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        Any
            Parsed detail payload with model provided
        """
        logger.debug(f"Parsing incoming data with Api Gateway model {APIGatewayProxyEventModel}")
        parsed_envelope: APIGatewayProxyEventModel = APIGatewayProxyEventModel.parse_obj(data)
        logger.debug(f"Parsing event payload in `detail` with {model}")
        return self._parse(data=parsed_envelope.body, model=model)

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> Optional[~Model]

Parses data found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

Any
Parsed detail payload with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
    """Parses data found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    Any
        Parsed detail payload with model provided
    """
    logger.debug(f"Parsing incoming data with Api Gateway model {APIGatewayProxyEventModel}")
    parsed_envelope: APIGatewayProxyEventModel = APIGatewayProxyEventModel.parse_obj(data)
    logger.debug(f"Parsing event payload in `detail` with {model}")
    return self._parse(data=parsed_envelope.body, model=model)
class ApiGatewayV2Envelope

API Gateway V2 envelope to extract data within body key

Expand source code
class ApiGatewayV2Envelope(BaseEnvelope):
    """API Gateway V2 envelope to extract data within body key"""

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
        """Parses data found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        Any
            Parsed detail payload with model provided
        """
        logger.debug(f"Parsing incoming data with Api Gateway model V2 {APIGatewayProxyEventV2Model}")
        parsed_envelope: APIGatewayProxyEventV2Model = APIGatewayProxyEventV2Model.parse_obj(data)
        logger.debug(f"Parsing event payload in `detail` with {model}")
        return self._parse(data=parsed_envelope.body, model=model)

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> Optional[~Model]

Parses data found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

Any
Parsed detail payload with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
    """Parses data found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    Any
        Parsed detail payload with model provided
    """
    logger.debug(f"Parsing incoming data with Api Gateway model V2 {APIGatewayProxyEventV2Model}")
    parsed_envelope: APIGatewayProxyEventV2Model = APIGatewayProxyEventV2Model.parse_obj(data)
    logger.debug(f"Parsing event payload in `detail` with {model}")
    return self._parse(data=parsed_envelope.body, model=model)
class BaseEnvelope

ABC implementation for creating a supported Envelope

Expand source code
class BaseEnvelope(ABC):
    """ABC implementation for creating a supported Envelope"""

    @staticmethod
    def _parse(data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Union[Model, None]:
        """Parses envelope data against model provided

        Parameters
        ----------
        data : Dict
            Data to be parsed and validated
        model : Type[Model]
            Data model to parse and validate data against

        Returns
        -------
        Any
            Parsed data
        """
        disable_pydantic_v2_warning()

        if data is None:
            logger.debug("Skipping parsing as event is None")
            return data

        logger.debug("parsing event against model")
        if isinstance(data, str):
            logger.debug("parsing event as string")
            return model.parse_raw(data)

        return model.parse_obj(data)

    @abstractmethod
    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]):
        """Implementation to parse data against envelope model, then against the data model

        NOTE: Call `_parse` method to fully parse data with model provided.

        Example
        -------

        **EventBridge envelope implementation example**

        def parse(...):
            # 1. parses data against envelope model
            parsed_envelope = EventBridgeModel(**data)

            # 2. parses portion of data within the envelope against model
            return self._parse(data=parsed_envelope.detail, model=data_model)
        """
        return NotImplemented  # pragma: no cover

Ancestors

  • abc.ABC

Subclasses

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model])

Implementation to parse data against envelope model, then against the data model

NOTE: Call _parse method to fully parse data with model provided.

Example

EventBridge envelope implementation example

def parse(…): # 1. parses data against envelope model parsed_envelope = EventBridgeModel(**data)

# 2. parses portion of data within the envelope against model
return self._parse(data=parsed_envelope.detail, model=data_model)
Expand source code
@abstractmethod
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]):
    """Implementation to parse data against envelope model, then against the data model

    NOTE: Call `_parse` method to fully parse data with model provided.

    Example
    -------

    **EventBridge envelope implementation example**

    def parse(...):
        # 1. parses data against envelope model
        parsed_envelope = EventBridgeModel(**data)

        # 2. parses portion of data within the envelope against model
        return self._parse(data=parsed_envelope.detail, model=data_model)
    """
    return NotImplemented  # pragma: no cover
class BedrockAgentEnvelope

Bedrock Agent envelope to extract data within input_text key

Expand source code
class BedrockAgentEnvelope(BaseEnvelope):
    """Bedrock Agent envelope to extract data within input_text key"""

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
        """Parses data found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        Optional[Model]
            Parsed detail payload with model provided
        """
        logger.debug(f"Parsing incoming data with Bedrock Agent model {BedrockAgentEventModel}")
        parsed_envelope: BedrockAgentEventModel = BedrockAgentEventModel.parse_obj(data)
        logger.debug(f"Parsing event payload in `input_text` with {model}")
        return self._parse(data=parsed_envelope.input_text, model=model)

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> Optional[~Model]

Parses data found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

Optional[Model]
Parsed detail payload with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
    """Parses data found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    Optional[Model]
        Parsed detail payload with model provided
    """
    logger.debug(f"Parsing incoming data with Bedrock Agent model {BedrockAgentEventModel}")
    parsed_envelope: BedrockAgentEventModel = BedrockAgentEventModel.parse_obj(data)
    logger.debug(f"Parsing event payload in `input_text` with {model}")
    return self._parse(data=parsed_envelope.input_text, model=model)
class CloudWatchLogsEnvelope

CloudWatch Envelope to extract a List of log records.

The record's body parameter is a string (after being base64 decoded and gzipped), though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.

Note: The record will be parsed the same way so if model is str

Expand source code
class CloudWatchLogsEnvelope(BaseEnvelope):
    """CloudWatch Envelope to extract a List of log records.

    The record's body parameter is a string (after being base64 decoded and gzipped),
    though it can also be a JSON encoded string.
    Regardless of its type it'll be parsed into a BaseModel object.

    Note: The record will be parsed the same way so if model is str
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}")
        parsed_envelope = CloudWatchLogsModel.parse_obj(data)
        logger.debug(f"Parsing CloudWatch records in `body` with {model}")
        return [
            self._parse(data=record.message, model=model) for record in parsed_envelope.awslogs.decoded_data.logEvents
        ]

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> List[Optional[~Model]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}")
    parsed_envelope = CloudWatchLogsModel.parse_obj(data)
    logger.debug(f"Parsing CloudWatch records in `body` with {model}")
    return [
        self._parse(data=record.message, model=model) for record in parsed_envelope.awslogs.decoded_data.logEvents
    ]
class DynamoDBStreamEnvelope

DynamoDB Stream Envelope to extract data within NewImage/OldImage

Note: Values are the parsed models. Images' values can also be None, and length of the list is the record's amount in the original event.

Expand source code
class DynamoDBStreamEnvelope(BaseEnvelope):
    """DynamoDB Stream Envelope to extract data within NewImage/OldImage

    Note: Values are the parsed models. Images' values can also be None, and
    length of the list is the record's amount in the original event.
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Dict[str, Optional[Model]]]:
        """Parses DynamoDB Stream records found in either NewImage and OldImage with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of dictionaries with NewImage and OldImage records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with DynamoDB Stream model {DynamoDBStreamModel}")
        parsed_envelope = DynamoDBStreamModel.parse_obj(data)
        logger.debug(f"Parsing DynamoDB Stream new and old records with {model}")
        return [
            {
                "NewImage": self._parse(data=record.dynamodb.NewImage, model=model),
                "OldImage": self._parse(data=record.dynamodb.OldImage, model=model),
            }
            for record in parsed_envelope.Records
        ]

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> List[Dict[str, Optional[~Model]]]

Parses DynamoDB Stream records found in either NewImage and OldImage with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of dictionaries with NewImage and OldImage records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Dict[str, Optional[Model]]]:
    """Parses DynamoDB Stream records found in either NewImage and OldImage with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of dictionaries with NewImage and OldImage records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with DynamoDB Stream model {DynamoDBStreamModel}")
    parsed_envelope = DynamoDBStreamModel.parse_obj(data)
    logger.debug(f"Parsing DynamoDB Stream new and old records with {model}")
    return [
        {
            "NewImage": self._parse(data=record.dynamodb.NewImage, model=model),
            "OldImage": self._parse(data=record.dynamodb.OldImage, model=model),
        }
        for record in parsed_envelope.Records
    ]
class EventBridgeEnvelope

EventBridge envelope to extract data within detail key

Expand source code
class EventBridgeEnvelope(BaseEnvelope):
    """EventBridge envelope to extract data within detail key"""

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
        """Parses data found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        Any
            Parsed detail payload with model provided
        """
        logger.debug(f"Parsing incoming data with EventBridge model {EventBridgeModel}")
        parsed_envelope: EventBridgeModel = EventBridgeModel.parse_obj(data)
        logger.debug(f"Parsing event payload in `detail` with {model}")
        return self._parse(data=parsed_envelope.detail, model=model)

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> Optional[~Model]

Parses data found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

Any
Parsed detail payload with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
    """Parses data found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    Any
        Parsed detail payload with model provided
    """
    logger.debug(f"Parsing incoming data with EventBridge model {EventBridgeModel}")
    parsed_envelope: EventBridgeModel = EventBridgeModel.parse_obj(data)
    logger.debug(f"Parsing event payload in `detail` with {model}")
    return self._parse(data=parsed_envelope.detail, model=model)
class KafkaEnvelope

Kafka event envelope to extract data within body key The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa)

Expand source code
class KafkaEnvelope(BaseEnvelope):
    """Kafka event envelope to extract data within body key
    The record's body parameter is a string, though it can also be a JSON encoded string.
    Regardless of its type it'll be parsed into a BaseModel object.

    Note: Records will be parsed the same way so if model is str,
    all items in the list will be parsed as str and npt as JSON (and vice versa)
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses data found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        event_source = cast(dict, data).get("eventSource")
        model_parse_event: Union[Type[KafkaMskEventModel], Type[KafkaSelfManagedEventModel]] = (
            KafkaMskEventModel if event_source == "aws:kafka" else KafkaSelfManagedEventModel
        )

        logger.debug(f"Parsing incoming data with Kafka event model {model_parse_event}")
        parsed_envelope = model_parse_event.parse_obj(data)
        logger.debug(f"Parsing Kafka event records in `value` with {model}")
        ret_list = []
        for records in parsed_envelope.records.values():
            ret_list += [self._parse(data=record.value, model=model) for record in records]
        return ret_list

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> List[Optional[~Model]]

Parses data found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses data found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    event_source = cast(dict, data).get("eventSource")
    model_parse_event: Union[Type[KafkaMskEventModel], Type[KafkaSelfManagedEventModel]] = (
        KafkaMskEventModel if event_source == "aws:kafka" else KafkaSelfManagedEventModel
    )

    logger.debug(f"Parsing incoming data with Kafka event model {model_parse_event}")
    parsed_envelope = model_parse_event.parse_obj(data)
    logger.debug(f"Parsing Kafka event records in `value` with {model}")
    ret_list = []
    for records in parsed_envelope.records.values():
        ret_list += [self._parse(data=record.value, model=model) for record in records]
    return ret_list
class KinesisDataStreamEnvelope

Kinesis Data Stream Envelope to extract array of Records

The record's data parameter is a base64 encoded string which is parsed into a bytes array, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and not as JSON (and vice versa)

Expand source code
class KinesisDataStreamEnvelope(BaseEnvelope):
    """Kinesis Data Stream Envelope to extract array of Records

    The record's data parameter is a base64 encoded string which is parsed into a bytes array,
    though it can also be a JSON encoded string.
    Regardless of its type it'll be parsed into a BaseModel object.

    Note: Records will be parsed the same way so if model is str,
    all items in the list will be parsed as str and not as JSON (and vice versa)
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}")
        parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data)
        logger.debug(f"Parsing Kinesis records in `body` with {model}")
        models = []
        for record in parsed_envelope.Records:
            # We allow either AWS expected contract (bytes) or a custom Model, see #943
            data = cast(bytes, record.kinesis.data)
            models.append(self._parse(data=data.decode("utf-8"), model=model))
        return models

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> List[Optional[~Model]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}")
    parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data)
    logger.debug(f"Parsing Kinesis records in `body` with {model}")
    models = []
    for record in parsed_envelope.Records:
        # We allow either AWS expected contract (bytes) or a custom Model, see #943
        data = cast(bytes, record.kinesis.data)
        models.append(self._parse(data=data.decode("utf-8"), model=model))
    return models
class KinesisFirehoseEnvelope

Kinesis Firehose Envelope to extract array of Records

The record's data parameter is a base64 encoded string which is parsed into a bytes array, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and not as JSON (and vice versa)

https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html

Expand source code
class KinesisFirehoseEnvelope(BaseEnvelope):
    """Kinesis Firehose Envelope to extract array of Records

    The record's data parameter is a base64 encoded string which is parsed into a bytes array,
    though it can also be a JSON encoded string.
    Regardless of its type it'll be parsed into a BaseModel object.

    Note: Records will be parsed the same way so if model is str,
    all items in the list will be parsed as str and not as JSON (and vice versa)

    https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with Kinesis Firehose model {KinesisFirehoseModel}")
        parsed_envelope: KinesisFirehoseModel = KinesisFirehoseModel.parse_obj(data)
        logger.debug(f"Parsing Kinesis Firehose records in `body` with {model}")
        models = []
        for record in parsed_envelope.records:
            # We allow either AWS expected contract (bytes) or a custom Model, see #943
            data = cast(bytes, record.data)
            models.append(self._parse(data=data.decode("utf-8"), model=model))
        return models

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> List[Optional[~Model]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with Kinesis Firehose model {KinesisFirehoseModel}")
    parsed_envelope: KinesisFirehoseModel = KinesisFirehoseModel.parse_obj(data)
    logger.debug(f"Parsing Kinesis Firehose records in `body` with {model}")
    models = []
    for record in parsed_envelope.records:
        # We allow either AWS expected contract (bytes) or a custom Model, see #943
        data = cast(bytes, record.data)
        models.append(self._parse(data=data.decode("utf-8"), model=model))
    return models
class LambdaFunctionUrlEnvelope

Lambda function URL envelope to extract data within body key

Expand source code
class LambdaFunctionUrlEnvelope(BaseEnvelope):
    """Lambda function URL envelope to extract data within body key"""

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
        """Parses data found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        Any
            Parsed detail payload with model provided
        """
        logger.debug(f"Parsing incoming data with Lambda function URL model {LambdaFunctionUrlModel}")
        parsed_envelope: LambdaFunctionUrlModel = LambdaFunctionUrlModel.parse_obj(data)
        logger.debug(f"Parsing event payload in `detail` with {model}")
        return self._parse(data=parsed_envelope.body, model=model)

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> Optional[~Model]

Parses data found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

Any
Parsed detail payload with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
    """Parses data found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    Any
        Parsed detail payload with model provided
    """
    logger.debug(f"Parsing incoming data with Lambda function URL model {LambdaFunctionUrlModel}")
    parsed_envelope: LambdaFunctionUrlModel = LambdaFunctionUrlModel.parse_obj(data)
    logger.debug(f"Parsing event payload in `detail` with {model}")
    return self._parse(data=parsed_envelope.body, model=model)
class SnsEnvelope

SNS Envelope to extract array of Records

The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa)

Expand source code
class SnsEnvelope(BaseEnvelope):
    """SNS Envelope to extract array of Records

    The record's body parameter is a string, though it can also be a JSON encoded string.
    Regardless of its type it'll be parsed into a BaseModel object.

    Note: Records will be parsed the same way so if model is str,
    all items in the list will be parsed as str and npt as JSON (and vice versa)
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with SNS model {SnsModel}")
        parsed_envelope = SnsModel.parse_obj(data)
        logger.debug(f"Parsing SNS records in `body` with {model}")
        return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> List[Optional[~Model]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with SNS model {SnsModel}")
    parsed_envelope = SnsModel.parse_obj(data)
    logger.debug(f"Parsing SNS records in `body` with {model}")
    return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]
class SnsSqsEnvelope

SNS plus SQS Envelope to extract array of Records

Published messages from SNS to SQS has a slightly different payload. Since SNS payload is marshalled into Record key in SQS, we have to:

  1. Parse SQS schema with incoming data
  2. Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record
  3. Finally, parse provided model against payload extracted
Expand source code
class SnsSqsEnvelope(BaseEnvelope):
    """SNS plus SQS Envelope to extract array of Records

    Published messages from SNS to SQS has a slightly different payload.
    Since SNS payload is marshalled into `Record` key in SQS, we have to:

    1. Parse SQS schema with incoming data
    2. Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record
    3. Finally, parse provided model against payload extracted
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
        parsed_envelope = SqsModel.parse_obj(data)
        output = []
        for record in parsed_envelope.Records:
            # We allow either AWS expected contract (str) or a custom Model, see #943
            body = cast(str, record.body)
            sns_notification = SnsNotificationModel.parse_raw(body)
            output.append(self._parse(data=sns_notification.Message, model=model))
        return output

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> List[Optional[~Model]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
    parsed_envelope = SqsModel.parse_obj(data)
    output = []
    for record in parsed_envelope.Records:
        # We allow either AWS expected contract (str) or a custom Model, see #943
        body = cast(str, record.body)
        sns_notification = SnsNotificationModel.parse_raw(body)
        output.append(self._parse(data=sns_notification.Message, model=model))
    return output
class SqsEnvelope

SQS Envelope to extract array of Records

The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa)

Expand source code
class SqsEnvelope(BaseEnvelope):
    """SQS Envelope to extract array of Records

    The record's body parameter is a string, though it can also be a JSON encoded string.
    Regardless of its type it'll be parsed into a BaseModel object.

    Note: Records will be parsed the same way so if model is str,
    all items in the list will be parsed as str and npt as JSON (and vice versa)
    """

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
        """Parses records found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        List
            List of records parsed with model provided
        """
        logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
        parsed_envelope = SqsModel.parse_obj(data)
        logger.debug(f"Parsing SQS records in `body` with {model}")
        return [self._parse(data=record.body, model=model) for record in parsed_envelope.Records]

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> List[Optional[~Model]]

Parses records found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

List
List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
    """Parses records found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    List
        List of records parsed with model provided
    """
    logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
    parsed_envelope = SqsModel.parse_obj(data)
    logger.debug(f"Parsing SQS records in `body` with {model}")
    return [self._parse(data=record.body, model=model) for record in parsed_envelope.Records]
class VpcLatticeEnvelope

Amazon VPC Lattice envelope to extract data within body key

Expand source code
class VpcLatticeEnvelope(BaseEnvelope):
    """Amazon VPC Lattice envelope to extract data within body key"""

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
        """Parses data found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        Optional[Model]
            Parsed detail payload with model provided
        """
        logger.debug(f"Parsing incoming data with VPC Lattice model {VpcLatticeModel}")
        parsed_envelope: VpcLatticeModel = VpcLatticeModel.parse_obj(data)
        logger.debug(f"Parsing event payload in `detail` with {model}")
        return self._parse(data=parsed_envelope.body, model=model)

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> Optional[~Model]

Parses data found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

Optional[Model]
Parsed detail payload with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
    """Parses data found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    Optional[Model]
        Parsed detail payload with model provided
    """
    logger.debug(f"Parsing incoming data with VPC Lattice model {VpcLatticeModel}")
    parsed_envelope: VpcLatticeModel = VpcLatticeModel.parse_obj(data)
    logger.debug(f"Parsing event payload in `detail` with {model}")
    return self._parse(data=parsed_envelope.body, model=model)
class VpcLatticeV2Envelope

Amazon VPC Lattice envelope to extract data within body key

Expand source code
class VpcLatticeV2Envelope(BaseEnvelope):
    """Amazon VPC Lattice envelope to extract data within body key"""

    def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
        """Parses data found with model provided

        Parameters
        ----------
        data : Dict
            Lambda event to be parsed
        model : Type[Model]
            Data model provided to parse after extracting data using envelope

        Returns
        -------
        Optional[Model]
            Parsed detail payload with model provided
        """
        logger.debug(f"Parsing incoming data with VPC Lattice V2 model {VpcLatticeV2Model}")
        parsed_envelope: VpcLatticeV2Model = VpcLatticeV2Model.parse_obj(data)
        logger.debug(f"Parsing event payload in `detail` with {model}")
        return self._parse(data=parsed_envelope.body, model=model)

Ancestors

Methods

def parse(self, data: Union[Dict[str, Any], Any, ForwardRef(None)], model: Type[~Model]) ‑> Optional[~Model]

Parses data found with model provided

Parameters

data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns

Optional[Model]
Parsed detail payload with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]:
    """Parses data found with model provided

    Parameters
    ----------
    data : Dict
        Lambda event to be parsed
    model : Type[Model]
        Data model provided to parse after extracting data using envelope

    Returns
    -------
    Optional[Model]
        Parsed detail payload with model provided
    """
    logger.debug(f"Parsing incoming data with VPC Lattice V2 model {VpcLatticeV2Model}")
    parsed_envelope: VpcLatticeV2Model = VpcLatticeV2Model.parse_obj(data)
    logger.debug(f"Parsing event payload in `detail` with {model}")
    return self._parse(data=parsed_envelope.body, model=model)