Module aws_lambda_powertools.utilities.data_classes.sqs_event

Expand source code
from typing import Any, Dict, Iterator, Optional

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper

class SQSRecordAttributes(DictWrapper):
    def aws_trace_header(self) -> Optional[str]:
        """Returns the AWS X-Ray trace header string."""
        return self.get("AWSTraceHeader")

    def approximate_receive_count(self) -> str:
        """Returns the number of times a message has been received across all queues but not deleted."""
        return self["ApproximateReceiveCount"]

    def sent_timestamp(self) -> str:
        """Returns the time the message was sent to the queue (epoch time in milliseconds)."""
        return self["SentTimestamp"]

    def sender_id(self) -> str:
        """For an IAM user, returns the IAM user ID, For an IAM role, returns the IAM role ID"""
        return self["SenderId"]

    def approximate_first_receive_timestamp(self) -> str:
        """Returns the time the message was first received from the queue (epoch time in milliseconds)."""
        return self["ApproximateFirstReceiveTimestamp"]

    def sequence_number(self) -> Optional[str]:
        """The large, non-consecutive number that Amazon SQS assigns to each message."""
        return self.get("SequenceNumber")

    def message_group_id(self) -> Optional[str]:
        """The tag that specifies that a message belongs to a specific message group.

        Messages that belong to the same message group are always processed one by one, in a
        strict order relative to the message group (however, messages that belong to different
        message groups might be processed out of order)."""
        return self.get("MessageGroupId")

    def message_deduplication_id(self) -> Optional[str]:
        """The token used for deduplication of sent messages.

        If a message with a particular message deduplication ID is sent successfully, any messages sent
        with the same message deduplication ID are accepted successfully but aren't delivered during
        the 5-minute deduplication interval."""
        return self.get("MessageDeduplicationId")

class SQSMessageAttribute(DictWrapper):
    """The user-specified message attribute value."""

    def string_value(self) -> Optional[str]:
        """Strings are Unicode with UTF-8 binary encoding."""
        return self["stringValue"]

    def binary_value(self) -> Optional[str]:
        """Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.

        Base64-encoded binary data object"""
        return self["binaryValue"]

    def data_type(self) -> str:
        """The message attribute data type. Supported types include `String`, `Number`, and `Binary`."""
        return self["dataType"]

class SQSMessageAttributes(Dict[str, SQSMessageAttribute]):
    def __getitem__(self, key: str) -> Optional[SQSMessageAttribute]:  # type: ignore
        item = super().get(key)
        return None if item is None else SQSMessageAttribute(item)  # type: ignore

class SQSRecord(DictWrapper):
    """An Amazon SQS message"""

    def message_id(self) -> str:
        """A unique identifier for the message.

        A messageId is considered unique across all AWS accounts for an extended period of time."""
        return self["messageId"]

    def receipt_handle(self) -> str:
        """An identifier associated with the act of receiving the message.

        A new receipt handle is returned every time you receive a message. When deleting a message,
        you provide the last received receipt handle to delete the message."""
        return self["receiptHandle"]

    def body(self) -> str:
        """The message's contents (not URL-encoded)."""
        return self["body"]

    def json_body(self) -> Any:
        """Deserializes JSON string available in 'body' property


        **Strict typing**

        Caller controls the type as we can't use recursive generics here.

        JSON Union types would force caller to have to cast a type. Instead,
        we choose Any to ease ergonomics and other tools receiving this data.


        **Type deserialized data from JSON string**

        data: dict = record.json_body  # {"telemetry": [], ...}
        # or
        data: list = record.json_body  # ["telemetry_values"]
        if self._json_data is None:
            self._json_data = self._json_deserializer(self["body"])
        return self._json_data

    def attributes(self) -> SQSRecordAttributes:
        """A map of the attributes requested in ReceiveMessage to their respective values."""
        return SQSRecordAttributes(self["attributes"])

    def message_attributes(self) -> SQSMessageAttributes:
        """Each message attribute consists of a Name, Type, and Value."""
        return SQSMessageAttributes(self["messageAttributes"])

    def md5_of_body(self) -> str:
        """An MD5 digest of the non-URL-encoded message body string."""
        return self["md5OfBody"]

    def event_source(self) -> str:
        """The AWS service from which the SQS record originated. For SQS, this is `aws:sqs`"""
        return self["eventSource"]

    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceARN"]

    def aws_region(self) -> str:
        """aws region eg: us-east-1"""
        return self["awsRegion"]

    def queue_url(self) -> str:
        """The URL of the queue."""
        arn_parts = self["eventSourceARN"].split(":")
        region = arn_parts[3]
        account_id = arn_parts[4]
        queue_name = arn_parts[5]

        queue_url = f"https://sqs.{region}{account_id}/{queue_name}"

        return queue_url

class SQSEvent(DictWrapper):
    """SQS Event


    def records(self) -> Iterator[SQSRecord]:
        for record in self["Records"]:
            yield SQSRecord(data=record, json_deserializer=self._json_deserializer)


class SQSEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

SQS Event



data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj`, by default json.loads
Expand source code
class SQSEvent(DictWrapper):
    """SQS Event


    def records(self) -> Iterator[SQSRecord]:
        for record in self["Records"]:
            yield SQSRecord(data=record, json_deserializer=self._json_deserializer)


  • DictWrapper

Instance variables

var records : Iterator[SQSRecord]
Expand source code
def records(self) -> Iterator[SQSRecord]:
    for record in self["Records"]:
        yield SQSRecord(data=record, json_deserializer=self._json_deserializer)

Inherited members

class SQSMessageAttribute (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

The user-specified message attribute value.


data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj`, by default json.loads
Expand source code
class SQSMessageAttribute(DictWrapper):
    """The user-specified message attribute value."""

    def string_value(self) -> Optional[str]:
        """Strings are Unicode with UTF-8 binary encoding."""
        return self["stringValue"]

    def binary_value(self) -> Optional[str]:
        """Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.

        Base64-encoded binary data object"""
        return self["binaryValue"]

    def data_type(self) -> str:
        """The message attribute data type. Supported types include `String`, `Number`, and `Binary`."""
        return self["dataType"]


  • DictWrapper

Instance variables

var binary_value : Optional[str]

Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.

Base64-encoded binary data object

Expand source code
def binary_value(self) -> Optional[str]:
    """Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.

    Base64-encoded binary data object"""
    return self["binaryValue"]
var data_type : str

The message attribute data type. Supported types include String, Number, and Binary.

Expand source code
def data_type(self) -> str:
    """The message attribute data type. Supported types include `String`, `Number`, and `Binary`."""
    return self["dataType"]
var string_value : Optional[str]

Strings are Unicode with UTF-8 binary encoding.

Expand source code
def string_value(self) -> Optional[str]:
    """Strings are Unicode with UTF-8 binary encoding."""
    return self["stringValue"]

Inherited members

class SQSMessageAttributes (*args, **kwargs)

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Expand source code
class SQSMessageAttributes(Dict[str, SQSMessageAttribute]):
    def __getitem__(self, key: str) -> Optional[SQSMessageAttribute]:  # type: ignore
        item = super().get(key)
        return None if item is None else SQSMessageAttribute(item)  # type: ignore


  • builtins.dict
  • typing.Generic
class SQSRecord (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

An Amazon SQS message


data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj`, by default json.loads
Expand source code
class SQSRecord(DictWrapper):
    """An Amazon SQS message"""

    def message_id(self) -> str:
        """A unique identifier for the message.

        A messageId is considered unique across all AWS accounts for an extended period of time."""
        return self["messageId"]

    def receipt_handle(self) -> str:
        """An identifier associated with the act of receiving the message.

        A new receipt handle is returned every time you receive a message. When deleting a message,
        you provide the last received receipt handle to delete the message."""
        return self["receiptHandle"]

    def body(self) -> str:
        """The message's contents (not URL-encoded)."""
        return self["body"]

    def json_body(self) -> Any:
        """Deserializes JSON string available in 'body' property


        **Strict typing**

        Caller controls the type as we can't use recursive generics here.

        JSON Union types would force caller to have to cast a type. Instead,
        we choose Any to ease ergonomics and other tools receiving this data.


        **Type deserialized data from JSON string**

        data: dict = record.json_body  # {"telemetry": [], ...}
        # or
        data: list = record.json_body  # ["telemetry_values"]
        if self._json_data is None:
            self._json_data = self._json_deserializer(self["body"])
        return self._json_data

    def attributes(self) -> SQSRecordAttributes:
        """A map of the attributes requested in ReceiveMessage to their respective values."""
        return SQSRecordAttributes(self["attributes"])

    def message_attributes(self) -> SQSMessageAttributes:
        """Each message attribute consists of a Name, Type, and Value."""
        return SQSMessageAttributes(self["messageAttributes"])

    def md5_of_body(self) -> str:
        """An MD5 digest of the non-URL-encoded message body string."""
        return self["md5OfBody"]

    def event_source(self) -> str:
        """The AWS service from which the SQS record originated. For SQS, this is `aws:sqs`"""
        return self["eventSource"]

    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceARN"]

    def aws_region(self) -> str:
        """aws region eg: us-east-1"""
        return self["awsRegion"]

    def queue_url(self) -> str:
        """The URL of the queue."""
        arn_parts = self["eventSourceARN"].split(":")
        region = arn_parts[3]
        account_id = arn_parts[4]
        queue_name = arn_parts[5]

        queue_url = f"https://sqs.{region}{account_id}/{queue_name}"

        return queue_url


  • DictWrapper

Instance variables

var attributesSQSRecordAttributes

A map of the attributes requested in ReceiveMessage to their respective values.

Expand source code
def attributes(self) -> SQSRecordAttributes:
    """A map of the attributes requested in ReceiveMessage to their respective values."""
    return SQSRecordAttributes(self["attributes"])
var aws_region : str

aws region eg: us-east-1

Expand source code
def aws_region(self) -> str:
    """aws region eg: us-east-1"""
    return self["awsRegion"]
var body : str

The message's contents (not URL-encoded).

Expand source code
def body(self) -> str:
    """The message's contents (not URL-encoded)."""
    return self["body"]
var event_source : str

The AWS service from which the SQS record originated. For SQS, this is aws:sqs

Expand source code
def event_source(self) -> str:
    """The AWS service from which the SQS record originated. For SQS, this is `aws:sqs`"""
    return self["eventSource"]
var event_source_arn : str

The Amazon Resource Name (ARN) of the event source

Expand source code
def event_source_arn(self) -> str:
    """The Amazon Resource Name (ARN) of the event source"""
    return self["eventSourceARN"]
var json_body : Any

Deserializes JSON string available in 'body' property


Strict typing

Caller controls the type as we can't use recursive generics here.

JSON Union types would force caller to have to cast a type. Instead, we choose Any to ease ergonomics and other tools receiving this data.


Type deserialized data from JSON string

data: dict = record.json_body  # {"telemetry": [], ...}
# or
data: list = record.json_body  # ["telemetry_values"]
Expand source code
def json_body(self) -> Any:
    """Deserializes JSON string available in 'body' property


    **Strict typing**

    Caller controls the type as we can't use recursive generics here.

    JSON Union types would force caller to have to cast a type. Instead,
    we choose Any to ease ergonomics and other tools receiving this data.


    **Type deserialized data from JSON string**

    data: dict = record.json_body  # {"telemetry": [], ...}
    # or
    data: list = record.json_body  # ["telemetry_values"]
    if self._json_data is None:
        self._json_data = self._json_deserializer(self["body"])
    return self._json_data
var md5_of_body : str

An MD5 digest of the non-URL-encoded message body string.

Expand source code
def md5_of_body(self) -> str:
    """An MD5 digest of the non-URL-encoded message body string."""
    return self["md5OfBody"]
var message_attributesSQSMessageAttributes

Each message attribute consists of a Name, Type, and Value.

Expand source code
def message_attributes(self) -> SQSMessageAttributes:
    """Each message attribute consists of a Name, Type, and Value."""
    return SQSMessageAttributes(self["messageAttributes"])
var message_id : str

A unique identifier for the message.

A messageId is considered unique across all AWS accounts for an extended period of time.

Expand source code
def message_id(self) -> str:
    """A unique identifier for the message.

    A messageId is considered unique across all AWS accounts for an extended period of time."""
    return self["messageId"]
var queue_url : str

The URL of the queue.

Expand source code
def queue_url(self) -> str:
    """The URL of the queue."""
    arn_parts = self["eventSourceARN"].split(":")
    region = arn_parts[3]
    account_id = arn_parts[4]
    queue_name = arn_parts[5]

    queue_url = f"https://sqs.{region}{account_id}/{queue_name}"

    return queue_url
var receipt_handle : str

An identifier associated with the act of receiving the message.

A new receipt handle is returned every time you receive a message. When deleting a message, you provide the last received receipt handle to delete the message.

Expand source code
def receipt_handle(self) -> str:
    """An identifier associated with the act of receiving the message.

    A new receipt handle is returned every time you receive a message. When deleting a message,
    you provide the last received receipt handle to delete the message."""
    return self["receiptHandle"]

Inherited members

class SQSRecordAttributes (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict


data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj`, by default json.loads
Expand source code
class SQSRecordAttributes(DictWrapper):
    def aws_trace_header(self) -> Optional[str]:
        """Returns the AWS X-Ray trace header string."""
        return self.get("AWSTraceHeader")

    def approximate_receive_count(self) -> str:
        """Returns the number of times a message has been received across all queues but not deleted."""
        return self["ApproximateReceiveCount"]

    def sent_timestamp(self) -> str:
        """Returns the time the message was sent to the queue (epoch time in milliseconds)."""
        return self["SentTimestamp"]

    def sender_id(self) -> str:
        """For an IAM user, returns the IAM user ID, For an IAM role, returns the IAM role ID"""
        return self["SenderId"]

    def approximate_first_receive_timestamp(self) -> str:
        """Returns the time the message was first received from the queue (epoch time in milliseconds)."""
        return self["ApproximateFirstReceiveTimestamp"]

    def sequence_number(self) -> Optional[str]:
        """The large, non-consecutive number that Amazon SQS assigns to each message."""
        return self.get("SequenceNumber")

    def message_group_id(self) -> Optional[str]:
        """The tag that specifies that a message belongs to a specific message group.

        Messages that belong to the same message group are always processed one by one, in a
        strict order relative to the message group (however, messages that belong to different
        message groups might be processed out of order)."""
        return self.get("MessageGroupId")

    def message_deduplication_id(self) -> Optional[str]:
        """The token used for deduplication of sent messages.

        If a message with a particular message deduplication ID is sent successfully, any messages sent
        with the same message deduplication ID are accepted successfully but aren't delivered during
        the 5-minute deduplication interval."""
        return self.get("MessageDeduplicationId")


  • DictWrapper

Instance variables

var approximate_first_receive_timestamp : str

Returns the time the message was first received from the queue (epoch time in milliseconds).

Expand source code
def approximate_first_receive_timestamp(self) -> str:
    """Returns the time the message was first received from the queue (epoch time in milliseconds)."""
    return self["ApproximateFirstReceiveTimestamp"]
var approximate_receive_count : str

Returns the number of times a message has been received across all queues but not deleted.

Expand source code
def approximate_receive_count(self) -> str:
    """Returns the number of times a message has been received across all queues but not deleted."""
    return self["ApproximateReceiveCount"]
var aws_trace_header : Optional[str]

Returns the AWS X-Ray trace header string.

Expand source code
def aws_trace_header(self) -> Optional[str]:
    """Returns the AWS X-Ray trace header string."""
    return self.get("AWSTraceHeader")
var message_deduplication_id : Optional[str]

The token used for deduplication of sent messages.

If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren't delivered during the 5-minute deduplication interval.

Expand source code
def message_deduplication_id(self) -> Optional[str]:
    """The token used for deduplication of sent messages.

    If a message with a particular message deduplication ID is sent successfully, any messages sent
    with the same message deduplication ID are accepted successfully but aren't delivered during
    the 5-minute deduplication interval."""
    return self.get("MessageDeduplicationId")
var message_group_id : Optional[str]

The tag that specifies that a message belongs to a specific message group.

Messages that belong to the same message group are always processed one by one, in a strict order relative to the message group (however, messages that belong to different message groups might be processed out of order).

Expand source code
def message_group_id(self) -> Optional[str]:
    """The tag that specifies that a message belongs to a specific message group.

    Messages that belong to the same message group are always processed one by one, in a
    strict order relative to the message group (however, messages that belong to different
    message groups might be processed out of order)."""
    return self.get("MessageGroupId")
var sender_id : str

For an IAM user, returns the IAM user ID, For an IAM role, returns the IAM role ID

Expand source code
def sender_id(self) -> str:
    """For an IAM user, returns the IAM user ID, For an IAM role, returns the IAM role ID"""
    return self["SenderId"]
var sent_timestamp : str

Returns the time the message was sent to the queue (epoch time in milliseconds).

Expand source code
def sent_timestamp(self) -> str:
    """Returns the time the message was sent to the queue (epoch time in milliseconds)."""
    return self["SentTimestamp"]
var sequence_number : Optional[str]

The large, non-consecutive number that Amazon SQS assigns to each message.

Expand source code
def sequence_number(self) -> Optional[str]:
    """The large, non-consecutive number that Amazon SQS assigns to each message."""
    return self.get("SequenceNumber")

Inherited members