Module aws_lambda_powertools.utilities.parser.models.sqs
Classes
class SqsAttributesModel (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SqsAttributesModel(BaseModel): ApproximateReceiveCount: str ApproximateFirstReceiveTimestamp: datetime MessageDeduplicationId: Optional[str] = None MessageGroupId: Optional[str] = None SenderId: str SentTimestamp: datetime SequenceNumber: Optional[str] = None AWSTraceHeader: Optional[str] = None DeadLetterQueueSourceArn: Optional[str] = None
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var AWSTraceHeader : Optional[str]
var ApproximateFirstReceiveTimestamp : datetime.datetime
var ApproximateReceiveCount : str
var DeadLetterQueueSourceArn : Optional[str]
var MessageDeduplicationId : Optional[str]
var MessageGroupId : Optional[str]
var SenderId : str
var SentTimestamp : datetime.datetime
var SequenceNumber : Optional[str]
class SqsModel (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SqsModel(BaseModel): Records: Sequence[SqsRecordModel]
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
Class variables
var Records : Sequence[SqsRecordModel]
class SqsMsgAttributeModel (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SqsMsgAttributeModel(BaseModel): stringValue: Optional[str] = None binaryValue: Optional[str] = None stringListValues: List[str] = [] binaryListValues: List[str] = [] dataType: str # context on why it's commented: https://github.com/aws-powertools/powertools-lambda-python/pull/118 # Amazon SQS supports the logical data types String, Number, and Binary with optional custom data type # labels with the format .custom-data-type. # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes # @validator("dataType") # def valid_type(cls, v): # noqa: VNE001,ERA001 # noqa: ERA001 # pattern = re.compile("Number.*|String.*|Binary.*") # noqa: ERA001 # if not pattern.match(v): # noqa: ERA001 # raise TypeError("data type is invalid") # noqa: ERA001 # return v # noqa: ERA001 # # # validate that dataType and value are not None and match # @root_validator # def check_str_and_binary_values(cls, values): # noqa: ERA001 # binary_val, str_val = values.get("binaryValue", ""), values.get("stringValue", "") # noqa: ERA001 # data_type = values.get("dataType") # noqa: ERA001 # if not str_val and not binary_val: # noqa: ERA001 # raise TypeError("both binaryValue and stringValue are missing") # noqa: ERA001 # if data_type.startswith("Binary") and not binary_val: # noqa: ERA001 # raise TypeError("binaryValue is missing") # noqa: ERA001 # if (data_type.startswith("String") or data_type.startswith("Number")) and not str_val: # noqa: ERA001 # raise TypeError("stringValue is missing") # noqa: ERA001 # return values # noqa: ERA001
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var binaryListValues : List[str]
var binaryValue : Optional[str]
var dataType : str
var stringListValues : List[str]
var stringValue : Optional[str]
class SqsRecordModel (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SqsRecordModel(BaseModel): messageId: str receiptHandle: str body: Union[str, Type[BaseModel], BaseModel] attributes: SqsAttributesModel messageAttributes: Dict[str, SqsMsgAttributeModel] md5OfBody: str md5OfMessageAttributes: Optional[str] = None eventSource: Literal["aws:sqs"] eventSourceARN: str awsRegion: str
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
Class variables
var attributes : SqsAttributesModel
var awsRegion : str
var body : Union[str, Type[pydantic.main.BaseModel], pydantic.main.BaseModel]
var eventSource : Literal['aws:sqs']
var eventSourceARN : str
var md5OfBody : str
var md5OfMessageAttributes : Optional[str]
var messageAttributes : Dict[str, SqsMsgAttributeModel]
var messageId : str
var receiptHandle : str