Module aws_lambda_powertools.utilities.parser.envelopes
Expand source code
from .base import BaseEnvelope
from .cloudwatch import CloudWatchLogsEnvelope
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
from .kinesis import KinesisDataStreamEnvelope
from .sns import SnsEnvelope, SnsSqsEnvelope
from .sqs import SqsEnvelope
__all__ = [
"CloudWatchLogsEnvelope",
"DynamoDBStreamEnvelope",
"EventBridgeEnvelope",
"KinesisDataStreamEnvelope",
"SnsEnvelope",
"SnsSqsEnvelope",
"SqsEnvelope",
"BaseEnvelope",
]
Sub-modules
aws_lambda_powertools.utilities.parser.envelopes.base
aws_lambda_powertools.utilities.parser.envelopes.cloudwatch
aws_lambda_powertools.utilities.parser.envelopes.dynamodb
aws_lambda_powertools.utilities.parser.envelopes.event_bridge
aws_lambda_powertools.utilities.parser.envelopes.kinesis
aws_lambda_powertools.utilities.parser.envelopes.sns
aws_lambda_powertools.utilities.parser.envelopes.sqs
Classes
class BaseEnvelope
-
ABC implementation for creating a supported Envelope
Expand source code
class BaseEnvelope(ABC): """ABC implementation for creating a supported Envelope""" @staticmethod def _parse(data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Union[Model, None]: """Parses envelope data against model provided Parameters ---------- data : Dict Data to be parsed and validated model : Type[Model] Data model to parse and validate data against Returns ------- Any Parsed data """ if data is None: logger.debug("Skipping parsing as event is None") return data logger.debug("parsing event against model") if isinstance(data, str): logger.debug("parsing event as string") return model.parse_raw(data) return model.parse_obj(data) @abstractmethod def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]): """Implementation to parse data against envelope model, then against the data model NOTE: Call `_parse` method to fully parse data with model provided. Example ------- **EventBridge envelope implementation example** def parse(...): # 1. parses data against envelope model parsed_envelope = EventBridgeModel(**data) # 2. parses portion of data within the envelope against model return self._parse(data=parsed_envelope.detail, model=data_model) """ return NotImplemented # pragma: no cover
Ancestors
- abc.ABC
Subclasses
- CloudWatchLogsEnvelope
- DynamoDBStreamEnvelope
- EventBridgeEnvelope
- KinesisDataStreamEnvelope
- SnsEnvelope
- SnsSqsEnvelope
- SqsEnvelope
Methods
def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model])
-
Implementation to parse data against envelope model, then against the data model
NOTE: Call
_parse
method to fully parse data with model provided.Example
EventBridge envelope implementation example
def parse(…): # 1. parses data against envelope model parsed_envelope = EventBridgeModel(**data)
# 2. parses portion of data within the envelope against model return self._parse(data=parsed_envelope.detail, model=data_model)
Expand source code
@abstractmethod def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]): """Implementation to parse data against envelope model, then against the data model NOTE: Call `_parse` method to fully parse data with model provided. Example ------- **EventBridge envelope implementation example** def parse(...): # 1. parses data against envelope model parsed_envelope = EventBridgeModel(**data) # 2. parses portion of data within the envelope against model return self._parse(data=parsed_envelope.detail, model=data_model) """ return NotImplemented # pragma: no cover
class CloudWatchLogsEnvelope
-
CloudWatch Envelope to extract a List of log records.
The record's body parameter is a string (after being base64 decoded and gzipped), though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.
Note: The record will be parsed the same way so if model is str
Expand source code
class CloudWatchLogsEnvelope(BaseEnvelope): """CloudWatch Envelope to extract a List of log records. The record's body parameter is a string (after being base64 decoded and gzipped), though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: The record will be parsed the same way so if model is str """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}") parsed_envelope = CloudWatchLogsModel.parse_obj(data) logger.debug(f"Parsing CloudWatch records in `body` with {model}") return [ self._parse(data=record.message, model=model) for record in parsed_envelope.awslogs.decoded_data.logEvents ]
Ancestors
- BaseEnvelope
- abc.ABC
Methods
def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Union[~Model, NoneType]]
-
Parses records found with model provided
Parameters
data
:Dict
- Lambda event to be parsed
model
:Type[Model]
- Data model provided to parse after extracting data using envelope
Returns
List
- List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}") parsed_envelope = CloudWatchLogsModel.parse_obj(data) logger.debug(f"Parsing CloudWatch records in `body` with {model}") return [ self._parse(data=record.message, model=model) for record in parsed_envelope.awslogs.decoded_data.logEvents ]
class DynamoDBStreamEnvelope
-
DynamoDB Stream Envelope to extract data within NewImage/OldImage
Note: Values are the parsed models. Images' values can also be None, and length of the list is the record's amount in the original event.
Expand source code
class DynamoDBStreamEnvelope(BaseEnvelope): """DynamoDB Stream Envelope to extract data within NewImage/OldImage Note: Values are the parsed models. Images' values can also be None, and length of the list is the record's amount in the original event. """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Dict[str, Optional[Model]]]: """Parses DynamoDB Stream records found in either NewImage and OldImage with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of dictionaries with NewImage and OldImage records parsed with model provided """ logger.debug(f"Parsing incoming data with DynamoDB Stream model {DynamoDBStreamModel}") parsed_envelope = DynamoDBStreamModel.parse_obj(data) logger.debug(f"Parsing DynamoDB Stream new and old records with {model}") return [ { "NewImage": self._parse(data=record.dynamodb.NewImage, model=model), "OldImage": self._parse(data=record.dynamodb.OldImage, model=model), } for record in parsed_envelope.Records ]
Ancestors
- BaseEnvelope
- abc.ABC
Methods
def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Dict[str, Union[~Model, NoneType]]]
-
Parses DynamoDB Stream records found in either NewImage and OldImage with model provided
Parameters
data
:Dict
- Lambda event to be parsed
model
:Type[Model]
- Data model provided to parse after extracting data using envelope
Returns
List
- List of dictionaries with NewImage and OldImage records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Dict[str, Optional[Model]]]: """Parses DynamoDB Stream records found in either NewImage and OldImage with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of dictionaries with NewImage and OldImage records parsed with model provided """ logger.debug(f"Parsing incoming data with DynamoDB Stream model {DynamoDBStreamModel}") parsed_envelope = DynamoDBStreamModel.parse_obj(data) logger.debug(f"Parsing DynamoDB Stream new and old records with {model}") return [ { "NewImage": self._parse(data=record.dynamodb.NewImage, model=model), "OldImage": self._parse(data=record.dynamodb.OldImage, model=model), } for record in parsed_envelope.Records ]
class EventBridgeEnvelope
-
EventBridge envelope to extract data within detail key
Expand source code
class EventBridgeEnvelope(BaseEnvelope): """EventBridge envelope to extract data within detail key""" def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]: """Parses data found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- Any Parsed detail payload with model provided """ logger.debug(f"Parsing incoming data with EventBridge model {EventBridgeModel}") parsed_envelope = EventBridgeModel.parse_obj(data) logger.debug(f"Parsing event payload in `detail` with {model}") return self._parse(data=parsed_envelope.detail, model=model)
Ancestors
- BaseEnvelope
- abc.ABC
Methods
def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> Union[~Model, NoneType]
-
Parses data found with model provided
Parameters
data
:Dict
- Lambda event to be parsed
model
:Type[Model]
- Data model provided to parse after extracting data using envelope
Returns
Any
- Parsed detail payload with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]: """Parses data found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- Any Parsed detail payload with model provided """ logger.debug(f"Parsing incoming data with EventBridge model {EventBridgeModel}") parsed_envelope = EventBridgeModel.parse_obj(data) logger.debug(f"Parsing event payload in `detail` with {model}") return self._parse(data=parsed_envelope.detail, model=model)
class KinesisDataStreamEnvelope
-
Kinesis Data Stream Envelope to extract array of Records
The record's data parameter is a base64 encoded string which is parsed into a bytes array, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.
Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa)
Expand source code
class KinesisDataStreamEnvelope(BaseEnvelope): """Kinesis Data Stream Envelope to extract array of Records The record's data parameter is a base64 encoded string which is parsed into a bytes array, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa) """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}") parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data) logger.debug(f"Parsing Kinesis records in `body` with {model}") return [ self._parse(data=record.kinesis.data.decode("utf-8"), model=model) for record in parsed_envelope.Records ]
Ancestors
- BaseEnvelope
- abc.ABC
Methods
def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Union[~Model, NoneType]]
-
Parses records found with model provided
Parameters
data
:Dict
- Lambda event to be parsed
model
:Type[Model]
- Data model provided to parse after extracting data using envelope
Returns
List
- List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}") parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data) logger.debug(f"Parsing Kinesis records in `body` with {model}") return [ self._parse(data=record.kinesis.data.decode("utf-8"), model=model) for record in parsed_envelope.Records ]
class SnsEnvelope
-
SNS Envelope to extract array of Records
The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.
Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa)
Expand source code
class SnsEnvelope(BaseEnvelope): """SNS Envelope to extract array of Records The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa) """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SNS model {SnsModel}") parsed_envelope = SnsModel.parse_obj(data) logger.debug(f"Parsing SNS records in `body` with {model}") return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]
Ancestors
- BaseEnvelope
- abc.ABC
Methods
def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Union[~Model, NoneType]]
-
Parses records found with model provided
Parameters
data
:Dict
- Lambda event to be parsed
model
:Type[Model]
- Data model provided to parse after extracting data using envelope
Returns
List
- List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SNS model {SnsModel}") parsed_envelope = SnsModel.parse_obj(data) logger.debug(f"Parsing SNS records in `body` with {model}") return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]
class SnsSqsEnvelope
-
SNS plus SQS Envelope to extract array of Records
Published messages from SNS to SQS has a slightly different payload. Since SNS payload is marshalled into
Record
key in SQS, we have to:- Parse SQS schema with incoming data
- Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record
- Finally, parse provided model against payload extracted
Expand source code
class SnsSqsEnvelope(BaseEnvelope): """SNS plus SQS Envelope to extract array of Records Published messages from SNS to SQS has a slightly different payload. Since SNS payload is marshalled into `Record` key in SQS, we have to: 1. Parse SQS schema with incoming data 2. Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record 3. Finally, parse provided model against payload extracted """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SQS model {SqsModel}") parsed_envelope = SqsModel.parse_obj(data) output = [] for record in parsed_envelope.Records: sns_notification = SnsNotificationModel.parse_raw(record.body) output.append(self._parse(data=sns_notification.Message, model=model)) return output
Ancestors
- BaseEnvelope
- abc.ABC
Methods
def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Union[~Model, NoneType]]
-
Parses records found with model provided
Parameters
data
:Dict
- Lambda event to be parsed
model
:Type[Model]
- Data model provided to parse after extracting data using envelope
Returns
List
- List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SQS model {SqsModel}") parsed_envelope = SqsModel.parse_obj(data) output = [] for record in parsed_envelope.Records: sns_notification = SnsNotificationModel.parse_raw(record.body) output.append(self._parse(data=sns_notification.Message, model=model)) return output
class SqsEnvelope
-
SQS Envelope to extract array of Records
The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object.
Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa)
Expand source code
class SqsEnvelope(BaseEnvelope): """SQS Envelope to extract array of Records The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa) """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SQS model {SqsModel}") parsed_envelope = SqsModel.parse_obj(data) logger.debug(f"Parsing SQS records in `body` with {model}") return [self._parse(data=record.body, model=model) for record in parsed_envelope.Records]
Ancestors
- BaseEnvelope
- abc.ABC
Methods
def parse(self, data: Union[Dict[str, Any], Any, NoneType], model: Type[~Model]) ‑> List[Union[~Model, NoneType]]
-
Parses records found with model provided
Parameters
data
:Dict
- Lambda event to be parsed
model
:Type[Model]
- Data model provided to parse after extracting data using envelope
Returns
List
- List of records parsed with model provided
Expand source code
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SQS model {SqsModel}") parsed_envelope = SqsModel.parse_obj(data) logger.debug(f"Parsing SQS records in `body` with {model}") return [self._parse(data=record.body, model=model) for record in parsed_envelope.Records]