Module aws_lambda_powertools.utilities.parser.models.sqs

Classes

class SqsAttributesModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class SqsAttributesModel(BaseModel):
    ApproximateReceiveCount: str
    ApproximateFirstReceiveTimestamp: datetime
    MessageDeduplicationId: Optional[str] = None
    MessageGroupId: Optional[str] = None
    SenderId: str
    SentTimestamp: datetime
    SequenceNumber: Optional[str] = None
    AWSTraceHeader: Optional[str] = None
    DeadLetterQueueSourceArn: Optional[str] = None

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var AWSTraceHeader : Optional[str]
var ApproximateFirstReceiveTimestamp : datetime.datetime
var ApproximateReceiveCount : str
var DeadLetterQueueSourceArn : Optional[str]
var MessageDeduplicationId : Optional[str]
var MessageGroupId : Optional[str]
var SenderId : str
var SentTimestamp : datetime.datetime
var SequenceNumber : Optional[str]
class SqsModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class SqsModel(BaseModel):
    Records: Sequence[SqsRecordModel]

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Subclasses

Class variables

var Records : Sequence[SqsRecordModel]
class SqsMsgAttributeModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class SqsMsgAttributeModel(BaseModel):
    stringValue: Optional[str] = None
    binaryValue: Optional[str] = None
    stringListValues: List[str] = []
    binaryListValues: List[str] = []
    dataType: str

    # context on why it's commented: https://github.com/aws-powertools/powertools-lambda-python/pull/118
    # Amazon SQS supports the logical data types String, Number, and Binary with optional custom data type
    # labels with the format .custom-data-type.
    # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes
    # @validator("dataType")
    # def valid_type(cls, v):  # noqa: VNE001,ERA001 # noqa: ERA001
    #     pattern = re.compile("Number.*|String.*|Binary.*") # noqa: ERA001
    #     if not pattern.match(v): # noqa: ERA001
    #         raise TypeError("data type is invalid") # noqa: ERA001
    #     return v # noqa: ERA001
    #
    # # validate that dataType and value are not None and match
    # @root_validator
    # def check_str_and_binary_values(cls, values): # noqa: ERA001
    #     binary_val, str_val = values.get("binaryValue", ""), values.get("stringValue", "") # noqa: ERA001
    #     data_type = values.get("dataType") # noqa: ERA001
    #     if not str_val and not binary_val: # noqa: ERA001
    #         raise TypeError("both binaryValue and stringValue are missing") # noqa: ERA001
    #     if data_type.startswith("Binary") and not binary_val: # noqa: ERA001
    #         raise TypeError("binaryValue is missing") # noqa: ERA001
    #     if (data_type.startswith("String") or data_type.startswith("Number")) and not str_val: # noqa: ERA001
    #         raise TypeError("stringValue is missing") # noqa: ERA001
    #     return values # noqa: ERA001

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var binaryListValues : List[str]
var binaryValue : Optional[str]
var dataType : str
var stringListValues : List[str]
var stringValue : Optional[str]
class SqsRecordModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class SqsRecordModel(BaseModel):
    messageId: str
    receiptHandle: str
    body: Union[str, Type[BaseModel], BaseModel]
    attributes: SqsAttributesModel
    messageAttributes: Dict[str, SqsMsgAttributeModel]
    md5OfBody: str
    md5OfMessageAttributes: Optional[str] = None
    eventSource: Literal["aws:sqs"]
    eventSourceARN: str
    awsRegion: str

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Subclasses

Class variables

var attributesSqsAttributesModel
var awsRegion : str
var body : Union[str, Type[pydantic.main.BaseModel], pydantic.main.BaseModel]
var eventSource : Literal['aws:sqs']
var eventSourceARN : str
var md5OfBody : str
var md5OfMessageAttributes : Optional[str]
var messageAttributes : Dict[str, SqsMsgAttributeModel]
var messageId : str
var receiptHandle : str