Module aws_lambda_powertools.utilities.parser.models.sns
Expand source code
from datetime import datetime
from typing import Dict, List, Optional, Union
from typing import Type as TypingType
from pydantic import BaseModel, root_validator
from pydantic.networks import HttpUrl
from aws_lambda_powertools.utilities.parser.types import Literal
class SnsMsgAttributeModel(BaseModel):
Type: str
Value: str
class SnsNotificationModel(BaseModel):
Subject: Optional[str] = None
TopicArn: str
UnsubscribeUrl: HttpUrl
Type: Literal["Notification"]
MessageAttributes: Optional[Dict[str, SnsMsgAttributeModel]] = None
Message: Union[str, TypingType[BaseModel]]
MessageId: str
SigningCertUrl: Optional[HttpUrl] = None # NOTE: FIFO opt-in removes attribute
Signature: Optional[str] = None # NOTE: FIFO opt-in removes attribute
Timestamp: datetime
SignatureVersion: Optional[str] = None # NOTE: FIFO opt-in removes attribute
@root_validator(pre=True, allow_reuse=True)
def check_sqs_protocol(cls, values):
sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL")
if any(key in sqs_rewritten_keys for key in values):
# The sentinel value 'None' forces the validator to fail with
# ValidatorError instead of KeyError when the key is missing from
# the SQS payload
values["UnsubscribeUrl"] = values.pop("UnsubscribeURL", None)
values["SigningCertUrl"] = values.pop("SigningCertURL", None)
return values
class SnsRecordModel(BaseModel):
EventSource: Literal["aws:sns"]
EventVersion: str
EventSubscriptionArn: str
Sns: SnsNotificationModel
class SnsModel(BaseModel):
Records: List[SnsRecordModel]
Classes
class SnsModel (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SnsModel(BaseModel): Records: List[SnsRecordModel]
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Records : List[SnsRecordModel]
class SnsMsgAttributeModel (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SnsMsgAttributeModel(BaseModel): Type: str Value: str
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Type : str
var Value : str
class SnsNotificationModel (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SnsNotificationModel(BaseModel): Subject: Optional[str] = None TopicArn: str UnsubscribeUrl: HttpUrl Type: Literal["Notification"] MessageAttributes: Optional[Dict[str, SnsMsgAttributeModel]] = None Message: Union[str, TypingType[BaseModel]] MessageId: str SigningCertUrl: Optional[HttpUrl] = None # NOTE: FIFO opt-in removes attribute Signature: Optional[str] = None # NOTE: FIFO opt-in removes attribute Timestamp: datetime SignatureVersion: Optional[str] = None # NOTE: FIFO opt-in removes attribute @root_validator(pre=True, allow_reuse=True) def check_sqs_protocol(cls, values): sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL") if any(key in sqs_rewritten_keys for key in values): # The sentinel value 'None' forces the validator to fail with # ValidatorError instead of KeyError when the key is missing from # the SQS payload values["UnsubscribeUrl"] = values.pop("UnsubscribeURL", None) values["SigningCertUrl"] = values.pop("SigningCertURL", None) return values
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Message : Union[str, Type[pydantic.main.BaseModel]]
var MessageAttributes : Optional[Dict[str, SnsMsgAttributeModel]]
var MessageId : str
var Signature : Optional[str]
var SignatureVersion : Optional[str]
var SigningCertUrl : Optional[pydantic.networks.HttpUrl]
var Subject : Optional[str]
var Timestamp : datetime.datetime
var TopicArn : str
var Type : Literal['Notification']
var UnsubscribeUrl : pydantic.networks.HttpUrl
Static methods
def check_sqs_protocol(values)
-
Expand source code
@root_validator(pre=True, allow_reuse=True) def check_sqs_protocol(cls, values): sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL") if any(key in sqs_rewritten_keys for key in values): # The sentinel value 'None' forces the validator to fail with # ValidatorError instead of KeyError when the key is missing from # the SQS payload values["UnsubscribeUrl"] = values.pop("UnsubscribeURL", None) values["SigningCertUrl"] = values.pop("SigningCertURL", None) return values
class SnsRecordModel (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SnsRecordModel(BaseModel): EventSource: Literal["aws:sns"] EventVersion: str EventSubscriptionArn: str Sns: SnsNotificationModel
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var EventSource : Literal['aws:sns']
var EventSubscriptionArn : str
var EventVersion : str
var Sns : SnsNotificationModel