Module aws_lambda_powertools.utilities.data_classes.active_mq_event
Classes
class ActiveMQEvent (data: Dict[str, Any])
-
Represents an Active MQ event sent to Lambda
Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class ActiveMQEvent(DictWrapper): """Represents an Active MQ event sent to Lambda Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html - https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/ """ def __init__(self, data: Dict[str, Any]): super().__init__(data) self._messages: Optional[Iterator[ActiveMQMessage]] = None @property def event_source(self) -> str: return self["eventSource"] @property def event_source_arn(self) -> str: """The Amazon Resource Name (ARN) of the event source""" return self["eventSourceArn"] @property def messages(self) -> Iterator[ActiveMQMessage]: for record in self["messages"]: yield ActiveMQMessage(record, json_deserializer=self._json_deserializer) @property def message(self) -> ActiveMQMessage: """ Returns the next ActiveMQ message using an iterator Returns ------- ActiveMQMessage The next activemq message. Raises ------ StopIteration If there are no more records available. """ if self._messages is None: self._messages = self.messages return next(self._messages)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
prop event_source : str
-
Expand source code
@property def event_source(self) -> str: return self["eventSource"]
prop event_source_arn : str
-
The Amazon Resource Name (ARN) of the event source
Expand source code
@property def event_source_arn(self) -> str: """The Amazon Resource Name (ARN) of the event source""" return self["eventSourceArn"]
prop message : ActiveMQMessage
-
Returns the next ActiveMQ message using an iterator
Returns
ActiveMQMessage
- The next activemq message.
Raises
StopIteration
- If there are no more records available.
Expand source code
@property def message(self) -> ActiveMQMessage: """ Returns the next ActiveMQ message using an iterator Returns ------- ActiveMQMessage The next activemq message. Raises ------ StopIteration If there are no more records available. """ if self._messages is None: self._messages = self.messages return next(self._messages)
prop messages : Iterator[ActiveMQMessage]
-
Expand source code
@property def messages(self) -> Iterator[ActiveMQMessage]: for record in self["messages"]: yield ActiveMQMessage(record, json_deserializer=self._json_deserializer)
Inherited members
class ActiveMQMessage (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class ActiveMQMessage(DictWrapper): @property def message_id(self) -> str: """Unique identifier for the message""" return self["messageID"] @property def message_type(self) -> str: return self["messageType"] @property def data(self) -> str: return self["data"] @property def decoded_data(self) -> str: """Decodes the data as a str""" return base64_decode(self.data) @cached_property def json_data(self) -> Any: return self._json_deserializer(self.decoded_data) @property def connection_id(self) -> str: return self["connectionId"] @property def redelivered(self) -> bool: """true if the message is being resent to the consumer""" return self["redelivered"] @property def timestamp(self) -> int: """Time in milliseconds.""" return self["timestamp"] @property def broker_in_time(self) -> int: """Time stamp (in milliseconds) for when the message arrived at the broker.""" return self["brokerInTime"] @property def broker_out_time(self) -> int: """Time stamp (in milliseconds) for when the message left the broker.""" return self["brokerOutTime"] @property def properties(self) -> dict: """Custom properties""" return self["properties"] @property def destination_physicalname(self) -> str: return self["destination"]["physicalName"] @property def delivery_mode(self) -> Optional[int]: """persistent or non-persistent delivery""" return self.get("deliveryMode") @property def correlation_id(self) -> Optional[str]: """User defined correlation id""" return self.get("correlationID") @property def reply_to(self) -> Optional[str]: """User defined reply to""" return self.get("replyTo") @property def get_type(self) -> Optional[str]: """User defined message type""" return self.get("type") @property def expiration(self) -> Optional[int]: """Expiration attribute whose value is given in milliseconds""" return self.get("expiration") @property def priority(self) -> Optional[int]: """ JMS defines a ten-level priority value, with 0 as the lowest priority and 9 as the highest. In addition, clients should consider priorities 0-4 as gradations of normal priority and priorities 5-9 as gradations of expedited priority. JMS does not require that a provider strictly implement priority ordering of messages; however, it should do its best to deliver expedited messages ahead of normal messages. """ return self.get("priority")
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
prop broker_in_time : int
-
Time stamp (in milliseconds) for when the message arrived at the broker.
Expand source code
@property def broker_in_time(self) -> int: """Time stamp (in milliseconds) for when the message arrived at the broker.""" return self["brokerInTime"]
prop broker_out_time : int
-
Time stamp (in milliseconds) for when the message left the broker.
Expand source code
@property def broker_out_time(self) -> int: """Time stamp (in milliseconds) for when the message left the broker.""" return self["brokerOutTime"]
prop connection_id : str
-
Expand source code
@property def connection_id(self) -> str: return self["connectionId"]
prop correlation_id : Optional[str]
-
User defined correlation id
Expand source code
@property def correlation_id(self) -> Optional[str]: """User defined correlation id""" return self.get("correlationID")
prop data : str
-
Expand source code
@property def data(self) -> str: return self["data"]
prop decoded_data : str
-
Decodes the data as a str
Expand source code
@property def decoded_data(self) -> str: """Decodes the data as a str""" return base64_decode(self.data)
prop delivery_mode : Optional[int]
-
persistent or non-persistent delivery
Expand source code
@property def delivery_mode(self) -> Optional[int]: """persistent or non-persistent delivery""" return self.get("deliveryMode")
prop destination_physicalname : str
-
Expand source code
@property def destination_physicalname(self) -> str: return self["destination"]["physicalName"]
prop expiration : Optional[int]
-
Expiration attribute whose value is given in milliseconds
Expand source code
@property def expiration(self) -> Optional[int]: """Expiration attribute whose value is given in milliseconds""" return self.get("expiration")
prop get_type : Optional[str]
-
User defined message type
Expand source code
@property def get_type(self) -> Optional[str]: """User defined message type""" return self.get("type")
var json_data
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
prop message_id : str
-
Unique identifier for the message
Expand source code
@property def message_id(self) -> str: """Unique identifier for the message""" return self["messageID"]
prop message_type : str
-
Expand source code
@property def message_type(self) -> str: return self["messageType"]
prop priority : Optional[int]
-
JMS defines a ten-level priority value, with 0 as the lowest priority and 9 as the highest. In addition, clients should consider priorities 0-4 as gradations of normal priority and priorities 5-9 as gradations of expedited priority.
JMS does not require that a provider strictly implement priority ordering of messages; however, it should do its best to deliver expedited messages ahead of normal messages.
Expand source code
@property def priority(self) -> Optional[int]: """ JMS defines a ten-level priority value, with 0 as the lowest priority and 9 as the highest. In addition, clients should consider priorities 0-4 as gradations of normal priority and priorities 5-9 as gradations of expedited priority. JMS does not require that a provider strictly implement priority ordering of messages; however, it should do its best to deliver expedited messages ahead of normal messages. """ return self.get("priority")
prop properties : dict
-
Custom properties
Expand source code
@property def properties(self) -> dict: """Custom properties""" return self["properties"]
prop redelivered : bool
-
true if the message is being resent to the consumer
Expand source code
@property def redelivered(self) -> bool: """true if the message is being resent to the consumer""" return self["redelivered"]
prop reply_to : Optional[str]
-
User defined reply to
Expand source code
@property def reply_to(self) -> Optional[str]: """User defined reply to""" return self.get("replyTo")
prop timestamp : int
-
Time in milliseconds.
Expand source code
@property def timestamp(self) -> int: """Time in milliseconds.""" return self["timestamp"]
Inherited members