Module aws_lambda_powertools.utilities.data_classes.active_mq_event

Classes

class ActiveMQEvent (data: Dict[str, Any])

Represents an Active MQ event sent to Lambda

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class ActiveMQEvent(DictWrapper):
    """Represents an Active MQ event sent to Lambda

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
    - https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
    """

    def __init__(self, data: Dict[str, Any]):
        super().__init__(data)
        self._messages: Optional[Iterator[ActiveMQMessage]] = None

    @property
    def event_source(self) -> str:
        return self["eventSource"]

    @property
    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceArn"]

    @property
    def messages(self) -> Iterator[ActiveMQMessage]:
        for record in self["messages"]:
            yield ActiveMQMessage(record, json_deserializer=self._json_deserializer)

    @property
    def message(self) -> ActiveMQMessage:
        """
        Returns the next ActiveMQ message using an iterator

        Returns
        -------
        ActiveMQMessage
            The next activemq message.

        Raises
        ------
        StopIteration
            If there are no more records available.

        """
        if self._messages is None:
            self._messages = self.messages
        return next(self._messages)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

prop event_source : str
Expand source code
@property
def event_source(self) -> str:
    return self["eventSource"]
prop event_source_arn : str

The Amazon Resource Name (ARN) of the event source

Expand source code
@property
def event_source_arn(self) -> str:
    """The Amazon Resource Name (ARN) of the event source"""
    return self["eventSourceArn"]
prop messageActiveMQMessage

Returns the next ActiveMQ message using an iterator

Returns

ActiveMQMessage
The next activemq message.

Raises

StopIteration
If there are no more records available.
Expand source code
@property
def message(self) -> ActiveMQMessage:
    """
    Returns the next ActiveMQ message using an iterator

    Returns
    -------
    ActiveMQMessage
        The next activemq message.

    Raises
    ------
    StopIteration
        If there are no more records available.

    """
    if self._messages is None:
        self._messages = self.messages
    return next(self._messages)
prop messages : Iterator[ActiveMQMessage]
Expand source code
@property
def messages(self) -> Iterator[ActiveMQMessage]:
    for record in self["messages"]:
        yield ActiveMQMessage(record, json_deserializer=self._json_deserializer)

Inherited members

class ActiveMQMessage (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class ActiveMQMessage(DictWrapper):
    @property
    def message_id(self) -> str:
        """Unique identifier for the message"""
        return self["messageID"]

    @property
    def message_type(self) -> str:
        return self["messageType"]

    @property
    def data(self) -> str:
        return self["data"]

    @property
    def decoded_data(self) -> str:
        """Decodes the data as a str"""
        return base64_decode(self.data)

    @cached_property
    def json_data(self) -> Any:
        return self._json_deserializer(self.decoded_data)

    @property
    def connection_id(self) -> str:
        return self["connectionId"]

    @property
    def redelivered(self) -> bool:
        """true if the message is being resent to the consumer"""
        return self["redelivered"]

    @property
    def timestamp(self) -> int:
        """Time in milliseconds."""
        return self["timestamp"]

    @property
    def broker_in_time(self) -> int:
        """Time stamp (in milliseconds) for when the message arrived at the broker."""
        return self["brokerInTime"]

    @property
    def broker_out_time(self) -> int:
        """Time stamp (in milliseconds) for when the message left the broker."""
        return self["brokerOutTime"]

    @property
    def properties(self) -> dict:
        """Custom properties"""
        return self["properties"]

    @property
    def destination_physicalname(self) -> str:
        return self["destination"]["physicalName"]

    @property
    def delivery_mode(self) -> Optional[int]:
        """persistent or non-persistent delivery"""
        return self.get("deliveryMode")

    @property
    def correlation_id(self) -> Optional[str]:
        """User defined correlation id"""
        return self.get("correlationID")

    @property
    def reply_to(self) -> Optional[str]:
        """User defined reply to"""
        return self.get("replyTo")

    @property
    def get_type(self) -> Optional[str]:
        """User defined message type"""
        return self.get("type")

    @property
    def expiration(self) -> Optional[int]:
        """Expiration attribute whose value is given in milliseconds"""
        return self.get("expiration")

    @property
    def priority(self) -> Optional[int]:
        """
        JMS defines a ten-level priority value, with 0 as the lowest priority and 9
        as the highest. In addition, clients should consider priorities 0-4 as
        gradations of normal priority and priorities 5-9 as gradations of expedited
        priority.

        JMS does not require that a provider strictly implement priority ordering
        of messages; however, it should do its best to deliver expedited messages
        ahead of normal messages.
        """
        return self.get("priority")

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

prop broker_in_time : int

Time stamp (in milliseconds) for when the message arrived at the broker.

Expand source code
@property
def broker_in_time(self) -> int:
    """Time stamp (in milliseconds) for when the message arrived at the broker."""
    return self["brokerInTime"]
prop broker_out_time : int

Time stamp (in milliseconds) for when the message left the broker.

Expand source code
@property
def broker_out_time(self) -> int:
    """Time stamp (in milliseconds) for when the message left the broker."""
    return self["brokerOutTime"]
prop connection_id : str
Expand source code
@property
def connection_id(self) -> str:
    return self["connectionId"]
prop correlation_id : Optional[str]

User defined correlation id

Expand source code
@property
def correlation_id(self) -> Optional[str]:
    """User defined correlation id"""
    return self.get("correlationID")
prop data : str
Expand source code
@property
def data(self) -> str:
    return self["data"]
prop decoded_data : str

Decodes the data as a str

Expand source code
@property
def decoded_data(self) -> str:
    """Decodes the data as a str"""
    return base64_decode(self.data)
prop delivery_mode : Optional[int]

persistent or non-persistent delivery

Expand source code
@property
def delivery_mode(self) -> Optional[int]:
    """persistent or non-persistent delivery"""
    return self.get("deliveryMode")
prop destination_physicalname : str
Expand source code
@property
def destination_physicalname(self) -> str:
    return self["destination"]["physicalName"]
prop expiration : Optional[int]

Expiration attribute whose value is given in milliseconds

Expand source code
@property
def expiration(self) -> Optional[int]:
    """Expiration attribute whose value is given in milliseconds"""
    return self.get("expiration")
prop get_type : Optional[str]

User defined message type

Expand source code
@property
def get_type(self) -> Optional[str]:
    """User defined message type"""
    return self.get("type")
var json_data
Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        val = self.func(instance)
        try:
            cache[self.attrname] = val
        except TypeError:
            msg = (
                f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                f"does not support item assignment for caching {self.attrname!r} property."
            )
            raise TypeError(msg) from None
    return val
prop message_id : str

Unique identifier for the message

Expand source code
@property
def message_id(self) -> str:
    """Unique identifier for the message"""
    return self["messageID"]
prop message_type : str
Expand source code
@property
def message_type(self) -> str:
    return self["messageType"]
prop priority : Optional[int]

JMS defines a ten-level priority value, with 0 as the lowest priority and 9 as the highest. In addition, clients should consider priorities 0-4 as gradations of normal priority and priorities 5-9 as gradations of expedited priority.

JMS does not require that a provider strictly implement priority ordering of messages; however, it should do its best to deliver expedited messages ahead of normal messages.

Expand source code
@property
def priority(self) -> Optional[int]:
    """
    JMS defines a ten-level priority value, with 0 as the lowest priority and 9
    as the highest. In addition, clients should consider priorities 0-4 as
    gradations of normal priority and priorities 5-9 as gradations of expedited
    priority.

    JMS does not require that a provider strictly implement priority ordering
    of messages; however, it should do its best to deliver expedited messages
    ahead of normal messages.
    """
    return self.get("priority")
prop properties : dict

Custom properties

Expand source code
@property
def properties(self) -> dict:
    """Custom properties"""
    return self["properties"]
prop redelivered : bool

true if the message is being resent to the consumer

Expand source code
@property
def redelivered(self) -> bool:
    """true if the message is being resent to the consumer"""
    return self["redelivered"]
prop reply_to : Optional[str]

User defined reply to

Expand source code
@property
def reply_to(self) -> Optional[str]:
    """User defined reply to"""
    return self.get("replyTo")
prop timestamp : int

Time in milliseconds.

Expand source code
@property
def timestamp(self) -> int:
    """Time in milliseconds."""
    return self["timestamp"]

Inherited members