Module aws_lambda_powertools.utilities.data_classes.rabbit_mq_event
Expand source code
from functools import cached_property
from typing import Any, Dict, List
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.shared_functions import base64_decode
class BasicProperties(DictWrapper):
def content_type(self) -> str:
return self["contentType"]
def content_encoding(self) -> str:
return self["contentEncoding"]
def headers(self) -> Dict[str, Any]:
return self["headers"]
def delivery_mode(self) -> int:
return self["deliveryMode"]
def priority(self) -> int:
return self["priority"]
def correlation_id(self) -> str:
return self["correlationId"]
def reply_to(self) -> str:
return self["replyTo"]
def expiration(self) -> str:
return self["expiration"]
def message_id(self) -> str:
return self["messageId"]
def timestamp(self) -> str:
return self["timestamp"]
def get_type(self) -> str:
return self["type"]
def user_id(self) -> str:
return self["userId"]
def app_id(self) -> str:
return self["appId"]
def cluster_id(self) -> str:
return self["clusterId"]
def body_size(self) -> int:
return self["bodySize"]
class RabbitMessage(DictWrapper):
def basic_properties(self) -> BasicProperties:
return BasicProperties(self["basicProperties"])
def redelivered(self) -> bool:
return self["redelivered"]
def data(self) -> str:
return self["data"]
def decoded_data(self) -> str:
"""Decodes the data as a str"""
return base64_decode(
def json_data(self) -> Any:
"""Parses the data as json"""
return self._json_deserializer(self.decoded_data)
class RabbitMQEvent(DictWrapper):
"""Represents a Rabbit MQ event sent to Lambda
def __init__(self, data: Dict[str, Any]):
self._rmq_messages_by_queue = {
key: [RabbitMessage(message) for message in messages]
for key, messages in self["rmqMessagesByQueue"].items()
def event_source(self) -> str:
return self["eventSource"]
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceArn"]
def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
return self._rmq_messages_by_queue
class BasicProperties (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
Provides a single read only access to a wrapper dict
:Dict[str, Any]
- Lambda Event Source Event payload
, optional- function to deserialize
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class BasicProperties(DictWrapper): @property def content_type(self) -> str: return self["contentType"] @property def content_encoding(self) -> str: return self["contentEncoding"] @property def headers(self) -> Dict[str, Any]: return self["headers"] @property def delivery_mode(self) -> int: return self["deliveryMode"] @property def priority(self) -> int: return self["priority"] @property def correlation_id(self) -> str: return self["correlationId"] @property def reply_to(self) -> str: return self["replyTo"] @property def expiration(self) -> str: return self["expiration"] @property def message_id(self) -> str: return self["messageId"] @property def timestamp(self) -> str: return self["timestamp"] @property def get_type(self) -> str: return self["type"] @property def user_id(self) -> str: return self["userId"] @property def app_id(self) -> str: return self["appId"] @property def cluster_id(self) -> str: return self["clusterId"] @property def body_size(self) -> int: return self["bodySize"]
- DictWrapper
Instance variables
var app_id : str
Expand source code
@property def app_id(self) -> str: return self["appId"]
var body_size : int
Expand source code
@property def body_size(self) -> int: return self["bodySize"]
var cluster_id : str
Expand source code
@property def cluster_id(self) -> str: return self["clusterId"]
var content_encoding : str
Expand source code
@property def content_encoding(self) -> str: return self["contentEncoding"]
var content_type : str
Expand source code
@property def content_type(self) -> str: return self["contentType"]
var correlation_id : str
Expand source code
@property def correlation_id(self) -> str: return self["correlationId"]
var delivery_mode : int
Expand source code
@property def delivery_mode(self) -> int: return self["deliveryMode"]
var expiration : str
Expand source code
@property def expiration(self) -> str: return self["expiration"]
var get_type : str
Expand source code
@property def get_type(self) -> str: return self["type"]
var headers : Dict[str, Any]
Expand source code
@property def headers(self) -> Dict[str, Any]: return self["headers"]
var message_id : str
Expand source code
@property def message_id(self) -> str: return self["messageId"]
var priority : int
Expand source code
@property def priority(self) -> int: return self["priority"]
var reply_to : str
Expand source code
@property def reply_to(self) -> str: return self["replyTo"]
var timestamp : str
Expand source code
@property def timestamp(self) -> str: return self["timestamp"]
var user_id : str
Expand source code
@property def user_id(self) -> str: return self["userId"]
Inherited members
class RabbitMQEvent (data: Dict[str, Any])
Represents a Rabbit MQ event sent to Lambda
:Dict[str, Any]
- Lambda Event Source Event payload
, optional- function to deserialize
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class RabbitMQEvent(DictWrapper): """Represents a Rabbit MQ event sent to Lambda Documentation: -------------- - - """ def __init__(self, data: Dict[str, Any]): super().__init__(data) self._rmq_messages_by_queue = { key: [RabbitMessage(message) for message in messages] for key, messages in self["rmqMessagesByQueue"].items() } @property def event_source(self) -> str: return self["eventSource"] @property def event_source_arn(self) -> str: """The Amazon Resource Name (ARN) of the event source""" return self["eventSourceArn"] @property def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]: return self._rmq_messages_by_queue
- DictWrapper
Instance variables
var event_source : str
Expand source code
@property def event_source(self) -> str: return self["eventSource"]
var event_source_arn : str
The Amazon Resource Name (ARN) of the event source
Expand source code
@property def event_source_arn(self) -> str: """The Amazon Resource Name (ARN) of the event source""" return self["eventSourceArn"]
var rmq_messages_by_queue : Dict[str, List[RabbitMessage]]
Expand source code
@property def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]: return self._rmq_messages_by_queue
Inherited members
class RabbitMessage (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
Provides a single read only access to a wrapper dict
:Dict[str, Any]
- Lambda Event Source Event payload
, optional- function to deserialize
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class RabbitMessage(DictWrapper): @property def basic_properties(self) -> BasicProperties: return BasicProperties(self["basicProperties"]) @property def redelivered(self) -> bool: return self["redelivered"] @property def data(self) -> str: return self["data"] @property def decoded_data(self) -> str: """Decodes the data as a str""" return base64_decode( @cached_property def json_data(self) -> Any: """Parses the data as json""" return self._json_deserializer(self.decoded_data)
- DictWrapper
Instance variables
var basic_properties : BasicProperties
Expand source code
@property def basic_properties(self) -> BasicProperties: return BasicProperties(self["basicProperties"])
var data : str
Expand source code
@property def data(self) -> str: return self["data"]
var decoded_data : str
Decodes the data as a str
Expand source code
@property def decoded_data(self) -> str: """Decodes the data as a str""" return base64_decode(
var json_data
Parses the data as json
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var redelivered : bool
Expand source code
@property def redelivered(self) -> bool: return self["redelivered"]
Inherited members