Module aws_lambda_powertools.utilities.data_classes.active_mq_event
Expand source code
from typing import Any, Dict, Iterator, Optional
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.shared_functions import base64_decode
class ActiveMQMessage(DictWrapper):
@property
def message_id(self) -> str:
"""Unique identifier for the message"""
return self["messageID"]
@property
def message_type(self) -> str:
return self["messageType"]
@property
def data(self) -> str:
return self["data"]
@property
def decoded_data(self) -> str:
"""Decodes the data as a str"""
return base64_decode(self.data)
@property
def json_data(self) -> Any:
"""Parses the data as json"""
if self._json_data is None:
self._json_data = self._json_deserializer(self.decoded_data)
return self._json_data
@property
def connection_id(self) -> str:
return self["connectionId"]
@property
def redelivered(self) -> bool:
"""true if the message is being resent to the consumer"""
return self["redelivered"]
@property
def timestamp(self) -> int:
"""Time in milliseconds."""
return self["timestamp"]
@property
def broker_in_time(self) -> int:
"""Time stamp (in milliseconds) for when the message arrived at the broker."""
return self["brokerInTime"]
@property
def broker_out_time(self) -> int:
"""Time stamp (in milliseconds) for when the message left the broker."""
return self["brokerOutTime"]
@property
def properties(self) -> dict:
"""Custom properties"""
return self["properties"]
@property
def destination_physicalname(self) -> str:
return self["destination"]["physicalname"]
@property
def delivery_mode(self) -> Optional[int]:
"""persistent or non-persistent delivery"""
return self.get("deliveryMode")
@property
def correlation_id(self) -> Optional[str]:
"""User defined correlation id"""
return self.get("correlationID")
@property
def reply_to(self) -> Optional[str]:
"""User defined reply to"""
return self.get("replyTo")
@property
def get_type(self) -> Optional[str]:
"""User defined message type"""
return self.get("type")
@property
def expiration(self) -> Optional[int]:
"""Expiration attribute whose value is given in milliseconds"""
return self.get("expiration")
@property
def priority(self) -> Optional[int]:
"""
JMS defines a ten-level priority value, with 0 as the lowest priority and 9
as the highest. In addition, clients should consider priorities 0-4 as
gradations of normal priority and priorities 5-9 as gradations of expedited
priority.
JMS does not require that a provider strictly implement priority ordering
of messages; however, it should do its best to deliver expedited messages
ahead of normal messages.
"""
return self.get("priority")
class ActiveMQEvent(DictWrapper):
"""Represents an Active MQ event sent to Lambda
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
"""
def __init__(self, data: Dict[str, Any]):
super().__init__(data)
self._messages: Optional[Iterator[ActiveMQMessage]] = None
@property
def event_source(self) -> str:
return self["eventSource"]
@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceArn"]
@property
def messages(self) -> Iterator[ActiveMQMessage]:
for record in self["messages"]:
yield ActiveMQMessage(record, json_deserializer=self._json_deserializer)
@property
def message(self) -> ActiveMQMessage:
"""
Returns the next ActiveMQ message using an iterator
Returns
-------
ActiveMQMessage
The next activemq message.
Raises
------
StopIteration
If there are no more records available.
"""
if self._messages is None:
self._messages = self.messages
return next(self._messages)
Classes
class ActiveMQEvent (data: Dict[str, Any])
-
Represents an Active MQ event sent to Lambda
Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class ActiveMQEvent(DictWrapper): """Represents an Active MQ event sent to Lambda Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html - https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/ """ def __init__(self, data: Dict[str, Any]): super().__init__(data) self._messages: Optional[Iterator[ActiveMQMessage]] = None @property def event_source(self) -> str: return self["eventSource"] @property def event_source_arn(self) -> str: """The Amazon Resource Name (ARN) of the event source""" return self["eventSourceArn"] @property def messages(self) -> Iterator[ActiveMQMessage]: for record in self["messages"]: yield ActiveMQMessage(record, json_deserializer=self._json_deserializer) @property def message(self) -> ActiveMQMessage: """ Returns the next ActiveMQ message using an iterator Returns ------- ActiveMQMessage The next activemq message. Raises ------ StopIteration If there are no more records available. """ if self._messages is None: self._messages = self.messages return next(self._messages)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var event_source : str
-
Expand source code
@property def event_source(self) -> str: return self["eventSource"]
var event_source_arn : str
-
The Amazon Resource Name (ARN) of the event source
Expand source code
@property def event_source_arn(self) -> str: """The Amazon Resource Name (ARN) of the event source""" return self["eventSourceArn"]
var message : ActiveMQMessage
-
Returns the next ActiveMQ message using an iterator
Returns
ActiveMQMessage
- The next activemq message.
Raises
StopIteration
- If there are no more records available.
Expand source code
@property def message(self) -> ActiveMQMessage: """ Returns the next ActiveMQ message using an iterator Returns ------- ActiveMQMessage The next activemq message. Raises ------ StopIteration If there are no more records available. """ if self._messages is None: self._messages = self.messages return next(self._messages)
var messages : Iterator[ActiveMQMessage]
-
Expand source code
@property def messages(self) -> Iterator[ActiveMQMessage]: for record in self["messages"]: yield ActiveMQMessage(record, json_deserializer=self._json_deserializer)
Inherited members
class ActiveMQMessage (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class ActiveMQMessage(DictWrapper): @property def message_id(self) -> str: """Unique identifier for the message""" return self["messageID"] @property def message_type(self) -> str: return self["messageType"] @property def data(self) -> str: return self["data"] @property def decoded_data(self) -> str: """Decodes the data as a str""" return base64_decode(self.data) @property def json_data(self) -> Any: """Parses the data as json""" if self._json_data is None: self._json_data = self._json_deserializer(self.decoded_data) return self._json_data @property def connection_id(self) -> str: return self["connectionId"] @property def redelivered(self) -> bool: """true if the message is being resent to the consumer""" return self["redelivered"] @property def timestamp(self) -> int: """Time in milliseconds.""" return self["timestamp"] @property def broker_in_time(self) -> int: """Time stamp (in milliseconds) for when the message arrived at the broker.""" return self["brokerInTime"] @property def broker_out_time(self) -> int: """Time stamp (in milliseconds) for when the message left the broker.""" return self["brokerOutTime"] @property def properties(self) -> dict: """Custom properties""" return self["properties"] @property def destination_physicalname(self) -> str: return self["destination"]["physicalname"] @property def delivery_mode(self) -> Optional[int]: """persistent or non-persistent delivery""" return self.get("deliveryMode") @property def correlation_id(self) -> Optional[str]: """User defined correlation id""" return self.get("correlationID") @property def reply_to(self) -> Optional[str]: """User defined reply to""" return self.get("replyTo") @property def get_type(self) -> Optional[str]: """User defined message type""" return self.get("type") @property def expiration(self) -> Optional[int]: """Expiration attribute whose value is given in milliseconds""" return self.get("expiration") @property def priority(self) -> Optional[int]: """ JMS defines a ten-level priority value, with 0 as the lowest priority and 9 as the highest. In addition, clients should consider priorities 0-4 as gradations of normal priority and priorities 5-9 as gradations of expedited priority. JMS does not require that a provider strictly implement priority ordering of messages; however, it should do its best to deliver expedited messages ahead of normal messages. """ return self.get("priority")
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var broker_in_time : int
-
Time stamp (in milliseconds) for when the message arrived at the broker.
Expand source code
@property def broker_in_time(self) -> int: """Time stamp (in milliseconds) for when the message arrived at the broker.""" return self["brokerInTime"]
var broker_out_time : int
-
Time stamp (in milliseconds) for when the message left the broker.
Expand source code
@property def broker_out_time(self) -> int: """Time stamp (in milliseconds) for when the message left the broker.""" return self["brokerOutTime"]
var connection_id : str
-
Expand source code
@property def connection_id(self) -> str: return self["connectionId"]
var correlation_id : Optional[str]
-
User defined correlation id
Expand source code
@property def correlation_id(self) -> Optional[str]: """User defined correlation id""" return self.get("correlationID")
var data : str
-
Expand source code
@property def data(self) -> str: return self["data"]
var decoded_data : str
-
Decodes the data as a str
Expand source code
@property def decoded_data(self) -> str: """Decodes the data as a str""" return base64_decode(self.data)
var delivery_mode : Optional[int]
-
persistent or non-persistent delivery
Expand source code
@property def delivery_mode(self) -> Optional[int]: """persistent or non-persistent delivery""" return self.get("deliveryMode")
var destination_physicalname : str
-
Expand source code
@property def destination_physicalname(self) -> str: return self["destination"]["physicalname"]
var expiration : Optional[int]
-
Expiration attribute whose value is given in milliseconds
Expand source code
@property def expiration(self) -> Optional[int]: """Expiration attribute whose value is given in milliseconds""" return self.get("expiration")
var get_type : Optional[str]
-
User defined message type
Expand source code
@property def get_type(self) -> Optional[str]: """User defined message type""" return self.get("type")
var json_data : Any
-
Parses the data as json
Expand source code
@property def json_data(self) -> Any: """Parses the data as json""" if self._json_data is None: self._json_data = self._json_deserializer(self.decoded_data) return self._json_data
var message_id : str
-
Unique identifier for the message
Expand source code
@property def message_id(self) -> str: """Unique identifier for the message""" return self["messageID"]
var message_type : str
-
Expand source code
@property def message_type(self) -> str: return self["messageType"]
var priority : Optional[int]
-
JMS defines a ten-level priority value, with 0 as the lowest priority and 9 as the highest. In addition, clients should consider priorities 0-4 as gradations of normal priority and priorities 5-9 as gradations of expedited priority.
JMS does not require that a provider strictly implement priority ordering of messages; however, it should do its best to deliver expedited messages ahead of normal messages.
Expand source code
@property def priority(self) -> Optional[int]: """ JMS defines a ten-level priority value, with 0 as the lowest priority and 9 as the highest. In addition, clients should consider priorities 0-4 as gradations of normal priority and priorities 5-9 as gradations of expedited priority. JMS does not require that a provider strictly implement priority ordering of messages; however, it should do its best to deliver expedited messages ahead of normal messages. """ return self.get("priority")
var properties : dict
-
Custom properties
Expand source code
@property def properties(self) -> dict: """Custom properties""" return self["properties"]
var redelivered : bool
-
true if the message is being resent to the consumer
Expand source code
@property def redelivered(self) -> bool: """true if the message is being resent to the consumer""" return self["redelivered"]
var reply_to : Optional[str]
-
User defined reply to
Expand source code
@property def reply_to(self) -> Optional[str]: """User defined reply to""" return self.get("replyTo")
var timestamp : int
-
Time in milliseconds.
Expand source code
@property def timestamp(self) -> int: """Time in milliseconds.""" return self["timestamp"]
Inherited members