Module aws_lambda_powertools.utilities.data_classes.active_mq_event

Expand source code
from typing import Any, Dict, Iterator, Optional

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.shared_functions import base64_decode


class ActiveMQMessage(DictWrapper):
    @property
    def message_id(self) -> str:
        """Unique identifier for the message"""
        return self["messageID"]

    @property
    def message_type(self) -> str:
        return self["messageType"]

    @property
    def data(self) -> str:
        return self["data"]

    @property
    def decoded_data(self) -> str:
        """Decodes the data as a str"""
        return base64_decode(self.data)

    @property
    def json_data(self) -> Any:
        """Parses the data as json"""
        if self._json_data is None:
            self._json_data = self._json_deserializer(self.decoded_data)
        return self._json_data

    @property
    def connection_id(self) -> str:
        return self["connectionId"]

    @property
    def redelivered(self) -> bool:
        """true if the message is being resent to the consumer"""
        return self["redelivered"]

    @property
    def timestamp(self) -> int:
        """Time in milliseconds."""
        return self["timestamp"]

    @property
    def broker_in_time(self) -> int:
        """Time stamp (in milliseconds) for when the message arrived at the broker."""
        return self["brokerInTime"]

    @property
    def broker_out_time(self) -> int:
        """Time stamp (in milliseconds) for when the message left the broker."""
        return self["brokerOutTime"]

    @property
    def properties(self) -> dict:
        """Custom properties"""
        return self["properties"]

    @property
    def destination_physicalname(self) -> str:
        return self["destination"]["physicalname"]

    @property
    def delivery_mode(self) -> Optional[int]:
        """persistent or non-persistent delivery"""
        return self.get("deliveryMode")

    @property
    def correlation_id(self) -> Optional[str]:
        """User defined correlation id"""
        return self.get("correlationID")

    @property
    def reply_to(self) -> Optional[str]:
        """User defined reply to"""
        return self.get("replyTo")

    @property
    def get_type(self) -> Optional[str]:
        """User defined message type"""
        return self.get("type")

    @property
    def expiration(self) -> Optional[int]:
        """Expiration attribute whose value is given in milliseconds"""
        return self.get("expiration")

    @property
    def priority(self) -> Optional[int]:
        """
        JMS defines a ten-level priority value, with 0 as the lowest priority and 9
        as the highest. In addition, clients should consider priorities 0-4 as
        gradations of normal priority and priorities 5-9 as gradations of expedited
        priority.

        JMS does not require that a provider strictly implement priority ordering
        of messages; however, it should do its best to deliver expedited messages
        ahead of normal messages.
        """
        return self.get("priority")


class ActiveMQEvent(DictWrapper):
    """Represents an Active MQ event sent to Lambda

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
    - https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
    """

    def __init__(self, data: Dict[str, Any]):
        super().__init__(data)
        self._messages: Optional[Iterator[ActiveMQMessage]] = None

    @property
    def event_source(self) -> str:
        return self["eventSource"]

    @property
    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceArn"]

    @property
    def messages(self) -> Iterator[ActiveMQMessage]:
        for record in self["messages"]:
            yield ActiveMQMessage(record, json_deserializer=self._json_deserializer)

    @property
    def message(self) -> ActiveMQMessage:
        """
        Returns the next ActiveMQ message using an iterator

        Returns
        -------
        ActiveMQMessage
            The next activemq message.

        Raises
        ------
        StopIteration
            If there are no more records available.

        """
        if self._messages is None:
            self._messages = self.messages
        return next(self._messages)

Classes

class ActiveMQEvent (data: Dict[str, Any])

Represents an Active MQ event sent to Lambda

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class ActiveMQEvent(DictWrapper):
    """Represents an Active MQ event sent to Lambda

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
    - https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
    """

    def __init__(self, data: Dict[str, Any]):
        super().__init__(data)
        self._messages: Optional[Iterator[ActiveMQMessage]] = None

    @property
    def event_source(self) -> str:
        return self["eventSource"]

    @property
    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceArn"]

    @property
    def messages(self) -> Iterator[ActiveMQMessage]:
        for record in self["messages"]:
            yield ActiveMQMessage(record, json_deserializer=self._json_deserializer)

    @property
    def message(self) -> ActiveMQMessage:
        """
        Returns the next ActiveMQ message using an iterator

        Returns
        -------
        ActiveMQMessage
            The next activemq message.

        Raises
        ------
        StopIteration
            If there are no more records available.

        """
        if self._messages is None:
            self._messages = self.messages
        return next(self._messages)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var event_source : str
Expand source code
@property
def event_source(self) -> str:
    return self["eventSource"]
var event_source_arn : str

The Amazon Resource Name (ARN) of the event source

Expand source code
@property
def event_source_arn(self) -> str:
    """The Amazon Resource Name (ARN) of the event source"""
    return self["eventSourceArn"]
var messageActiveMQMessage

Returns the next ActiveMQ message using an iterator

Returns

ActiveMQMessage
The next activemq message.

Raises

StopIteration
If there are no more records available.
Expand source code
@property
def message(self) -> ActiveMQMessage:
    """
    Returns the next ActiveMQ message using an iterator

    Returns
    -------
    ActiveMQMessage
        The next activemq message.

    Raises
    ------
    StopIteration
        If there are no more records available.

    """
    if self._messages is None:
        self._messages = self.messages
    return next(self._messages)
var messages : Iterator[ActiveMQMessage]
Expand source code
@property
def messages(self) -> Iterator[ActiveMQMessage]:
    for record in self["messages"]:
        yield ActiveMQMessage(record, json_deserializer=self._json_deserializer)

Inherited members

class ActiveMQMessage (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class ActiveMQMessage(DictWrapper):
    @property
    def message_id(self) -> str:
        """Unique identifier for the message"""
        return self["messageID"]

    @property
    def message_type(self) -> str:
        return self["messageType"]

    @property
    def data(self) -> str:
        return self["data"]

    @property
    def decoded_data(self) -> str:
        """Decodes the data as a str"""
        return base64_decode(self.data)

    @property
    def json_data(self) -> Any:
        """Parses the data as json"""
        if self._json_data is None:
            self._json_data = self._json_deserializer(self.decoded_data)
        return self._json_data

    @property
    def connection_id(self) -> str:
        return self["connectionId"]

    @property
    def redelivered(self) -> bool:
        """true if the message is being resent to the consumer"""
        return self["redelivered"]

    @property
    def timestamp(self) -> int:
        """Time in milliseconds."""
        return self["timestamp"]

    @property
    def broker_in_time(self) -> int:
        """Time stamp (in milliseconds) for when the message arrived at the broker."""
        return self["brokerInTime"]

    @property
    def broker_out_time(self) -> int:
        """Time stamp (in milliseconds) for when the message left the broker."""
        return self["brokerOutTime"]

    @property
    def properties(self) -> dict:
        """Custom properties"""
        return self["properties"]

    @property
    def destination_physicalname(self) -> str:
        return self["destination"]["physicalname"]

    @property
    def delivery_mode(self) -> Optional[int]:
        """persistent or non-persistent delivery"""
        return self.get("deliveryMode")

    @property
    def correlation_id(self) -> Optional[str]:
        """User defined correlation id"""
        return self.get("correlationID")

    @property
    def reply_to(self) -> Optional[str]:
        """User defined reply to"""
        return self.get("replyTo")

    @property
    def get_type(self) -> Optional[str]:
        """User defined message type"""
        return self.get("type")

    @property
    def expiration(self) -> Optional[int]:
        """Expiration attribute whose value is given in milliseconds"""
        return self.get("expiration")

    @property
    def priority(self) -> Optional[int]:
        """
        JMS defines a ten-level priority value, with 0 as the lowest priority and 9
        as the highest. In addition, clients should consider priorities 0-4 as
        gradations of normal priority and priorities 5-9 as gradations of expedited
        priority.

        JMS does not require that a provider strictly implement priority ordering
        of messages; however, it should do its best to deliver expedited messages
        ahead of normal messages.
        """
        return self.get("priority")

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var broker_in_time : int

Time stamp (in milliseconds) for when the message arrived at the broker.

Expand source code
@property
def broker_in_time(self) -> int:
    """Time stamp (in milliseconds) for when the message arrived at the broker."""
    return self["brokerInTime"]
var broker_out_time : int

Time stamp (in milliseconds) for when the message left the broker.

Expand source code
@property
def broker_out_time(self) -> int:
    """Time stamp (in milliseconds) for when the message left the broker."""
    return self["brokerOutTime"]
var connection_id : str
Expand source code
@property
def connection_id(self) -> str:
    return self["connectionId"]
var correlation_id : Optional[str]

User defined correlation id

Expand source code
@property
def correlation_id(self) -> Optional[str]:
    """User defined correlation id"""
    return self.get("correlationID")
var data : str
Expand source code
@property
def data(self) -> str:
    return self["data"]
var decoded_data : str

Decodes the data as a str

Expand source code
@property
def decoded_data(self) -> str:
    """Decodes the data as a str"""
    return base64_decode(self.data)
var delivery_mode : Optional[int]

persistent or non-persistent delivery

Expand source code
@property
def delivery_mode(self) -> Optional[int]:
    """persistent or non-persistent delivery"""
    return self.get("deliveryMode")
var destination_physicalname : str
Expand source code
@property
def destination_physicalname(self) -> str:
    return self["destination"]["physicalname"]
var expiration : Optional[int]

Expiration attribute whose value is given in milliseconds

Expand source code
@property
def expiration(self) -> Optional[int]:
    """Expiration attribute whose value is given in milliseconds"""
    return self.get("expiration")
var get_type : Optional[str]

User defined message type

Expand source code
@property
def get_type(self) -> Optional[str]:
    """User defined message type"""
    return self.get("type")
var json_data : Any

Parses the data as json

Expand source code
@property
def json_data(self) -> Any:
    """Parses the data as json"""
    if self._json_data is None:
        self._json_data = self._json_deserializer(self.decoded_data)
    return self._json_data
var message_id : str

Unique identifier for the message

Expand source code
@property
def message_id(self) -> str:
    """Unique identifier for the message"""
    return self["messageID"]
var message_type : str
Expand source code
@property
def message_type(self) -> str:
    return self["messageType"]
var priority : Optional[int]

JMS defines a ten-level priority value, with 0 as the lowest priority and 9 as the highest. In addition, clients should consider priorities 0-4 as gradations of normal priority and priorities 5-9 as gradations of expedited priority.

JMS does not require that a provider strictly implement priority ordering of messages; however, it should do its best to deliver expedited messages ahead of normal messages.

Expand source code
@property
def priority(self) -> Optional[int]:
    """
    JMS defines a ten-level priority value, with 0 as the lowest priority and 9
    as the highest. In addition, clients should consider priorities 0-4 as
    gradations of normal priority and priorities 5-9 as gradations of expedited
    priority.

    JMS does not require that a provider strictly implement priority ordering
    of messages; however, it should do its best to deliver expedited messages
    ahead of normal messages.
    """
    return self.get("priority")
var properties : dict

Custom properties

Expand source code
@property
def properties(self) -> dict:
    """Custom properties"""
    return self["properties"]
var redelivered : bool

true if the message is being resent to the consumer

Expand source code
@property
def redelivered(self) -> bool:
    """true if the message is being resent to the consumer"""
    return self["redelivered"]
var reply_to : Optional[str]

User defined reply to

Expand source code
@property
def reply_to(self) -> Optional[str]:
    """User defined reply to"""
    return self.get("replyTo")
var timestamp : int

Time in milliseconds.

Expand source code
@property
def timestamp(self) -> int:
    """Time in milliseconds."""
    return self["timestamp"]

Inherited members