Module aws_lambda_powertools.utilities.data_classes.rabbit_mq_event

Expand source code
from typing import Any, Dict, List

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.shared_functions import base64_decode


class BasicProperties(DictWrapper):
    @property
    def content_type(self) -> str:
        return self["contentType"]

    @property
    def content_encoding(self) -> str:
        return self["contentEncoding"]

    @property
    def headers(self) -> Dict[str, Any]:
        return self["headers"]

    @property
    def delivery_mode(self) -> int:
        return self["deliveryMode"]

    @property
    def priority(self) -> int:
        return self["priority"]

    @property
    def correlation_id(self) -> str:
        return self["correlationId"]

    @property
    def reply_to(self) -> str:
        return self["replyTo"]

    @property
    def expiration(self) -> str:
        return self["expiration"]

    @property
    def message_id(self) -> str:
        return self["messageId"]

    @property
    def timestamp(self) -> str:
        return self["timestamp"]

    @property
    def get_type(self) -> str:
        return self["type"]

    @property
    def user_id(self) -> str:
        return self["userId"]

    @property
    def app_id(self) -> str:
        return self["appId"]

    @property
    def cluster_id(self) -> str:
        return self["clusterId"]

    @property
    def body_size(self) -> int:
        return self["bodySize"]


class RabbitMessage(DictWrapper):
    @property
    def basic_properties(self) -> BasicProperties:
        return BasicProperties(self["basicProperties"])

    @property
    def redelivered(self) -> bool:
        return self["redelivered"]

    @property
    def data(self) -> str:
        return self["data"]

    @property
    def decoded_data(self) -> str:
        """Decodes the data as a str"""
        return base64_decode(self.data)

    @property
    def json_data(self) -> Any:
        """Parses the data as json"""
        if self._json_data is None:
            self._json_data = self._json_deserializer(self.decoded_data)
        return self._json_data


class RabbitMQEvent(DictWrapper):
    """Represents a Rabbit MQ event sent to Lambda

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
    - https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/
    """

    def __init__(self, data: Dict[str, Any]):
        super().__init__(data)
        self._rmq_messages_by_queue = {
            key: [RabbitMessage(message) for message in messages]
            for key, messages in self["rmqMessagesByQueue"].items()
        }

    @property
    def event_source(self) -> str:
        return self["eventSource"]

    @property
    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceArn"]

    @property
    def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
        return self._rmq_messages_by_queue

Classes

class BasicProperties (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class BasicProperties(DictWrapper):
    @property
    def content_type(self) -> str:
        return self["contentType"]

    @property
    def content_encoding(self) -> str:
        return self["contentEncoding"]

    @property
    def headers(self) -> Dict[str, Any]:
        return self["headers"]

    @property
    def delivery_mode(self) -> int:
        return self["deliveryMode"]

    @property
    def priority(self) -> int:
        return self["priority"]

    @property
    def correlation_id(self) -> str:
        return self["correlationId"]

    @property
    def reply_to(self) -> str:
        return self["replyTo"]

    @property
    def expiration(self) -> str:
        return self["expiration"]

    @property
    def message_id(self) -> str:
        return self["messageId"]

    @property
    def timestamp(self) -> str:
        return self["timestamp"]

    @property
    def get_type(self) -> str:
        return self["type"]

    @property
    def user_id(self) -> str:
        return self["userId"]

    @property
    def app_id(self) -> str:
        return self["appId"]

    @property
    def cluster_id(self) -> str:
        return self["clusterId"]

    @property
    def body_size(self) -> int:
        return self["bodySize"]

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var app_id : str
Expand source code
@property
def app_id(self) -> str:
    return self["appId"]
var body_size : int
Expand source code
@property
def body_size(self) -> int:
    return self["bodySize"]
var cluster_id : str
Expand source code
@property
def cluster_id(self) -> str:
    return self["clusterId"]
var content_encoding : str
Expand source code
@property
def content_encoding(self) -> str:
    return self["contentEncoding"]
var content_type : str
Expand source code
@property
def content_type(self) -> str:
    return self["contentType"]
var correlation_id : str
Expand source code
@property
def correlation_id(self) -> str:
    return self["correlationId"]
var delivery_mode : int
Expand source code
@property
def delivery_mode(self) -> int:
    return self["deliveryMode"]
var expiration : str
Expand source code
@property
def expiration(self) -> str:
    return self["expiration"]
var get_type : str
Expand source code
@property
def get_type(self) -> str:
    return self["type"]
var headers : Dict[str, Any]
Expand source code
@property
def headers(self) -> Dict[str, Any]:
    return self["headers"]
var message_id : str
Expand source code
@property
def message_id(self) -> str:
    return self["messageId"]
var priority : int
Expand source code
@property
def priority(self) -> int:
    return self["priority"]
var reply_to : str
Expand source code
@property
def reply_to(self) -> str:
    return self["replyTo"]
var timestamp : str
Expand source code
@property
def timestamp(self) -> str:
    return self["timestamp"]
var user_id : str
Expand source code
@property
def user_id(self) -> str:
    return self["userId"]

Inherited members

class RabbitMQEvent (data: Dict[str, Any])

Represents a Rabbit MQ event sent to Lambda

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class RabbitMQEvent(DictWrapper):
    """Represents a Rabbit MQ event sent to Lambda

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
    - https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/
    """

    def __init__(self, data: Dict[str, Any]):
        super().__init__(data)
        self._rmq_messages_by_queue = {
            key: [RabbitMessage(message) for message in messages]
            for key, messages in self["rmqMessagesByQueue"].items()
        }

    @property
    def event_source(self) -> str:
        return self["eventSource"]

    @property
    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceArn"]

    @property
    def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
        return self._rmq_messages_by_queue

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var event_source : str
Expand source code
@property
def event_source(self) -> str:
    return self["eventSource"]
var event_source_arn : str

The Amazon Resource Name (ARN) of the event source

Expand source code
@property
def event_source_arn(self) -> str:
    """The Amazon Resource Name (ARN) of the event source"""
    return self["eventSourceArn"]
var rmq_messages_by_queue : Dict[str, List[RabbitMessage]]
Expand source code
@property
def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
    return self._rmq_messages_by_queue

Inherited members

class RabbitMessage (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class RabbitMessage(DictWrapper):
    @property
    def basic_properties(self) -> BasicProperties:
        return BasicProperties(self["basicProperties"])

    @property
    def redelivered(self) -> bool:
        return self["redelivered"]

    @property
    def data(self) -> str:
        return self["data"]

    @property
    def decoded_data(self) -> str:
        """Decodes the data as a str"""
        return base64_decode(self.data)

    @property
    def json_data(self) -> Any:
        """Parses the data as json"""
        if self._json_data is None:
            self._json_data = self._json_deserializer(self.decoded_data)
        return self._json_data

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var basic_propertiesBasicProperties
Expand source code
@property
def basic_properties(self) -> BasicProperties:
    return BasicProperties(self["basicProperties"])
var data : str
Expand source code
@property
def data(self) -> str:
    return self["data"]
var decoded_data : str

Decodes the data as a str

Expand source code
@property
def decoded_data(self) -> str:
    """Decodes the data as a str"""
    return base64_decode(self.data)
var json_data : Any

Parses the data as json

Expand source code
@property
def json_data(self) -> Any:
    """Parses the data as json"""
    if self._json_data is None:
        self._json_data = self._json_deserializer(self.decoded_data)
    return self._json_data
var redelivered : bool
Expand source code
@property
def redelivered(self) -> bool:
    return self["redelivered"]

Inherited members