Module aws_lambda_powertools.utilities.data_classes.kafka_event

Classes

class KafkaEvent (data: dict[str, Any])

Self-managed or MSK Apache Kafka event trigger Documentation:


Parameters

data : dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KafkaEvent(DictWrapper):
    """Self-managed or MSK Apache Kafka event trigger
    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
    - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
    """

    def __init__(self, data: dict[str, Any]):
        super().__init__(data)
        self._records: Iterator[KafkaEventRecord] | None = None

    @property
    def event_source(self) -> str:
        """The AWS service from which the Kafka event record originated."""
        return self["eventSource"]

    @property
    def event_source_arn(self) -> str | None:
        """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK."""
        return self.get("eventSourceArn")

    @property
    def bootstrap_servers(self) -> str:
        """The Kafka bootstrap URL."""
        return self["bootstrapServers"]

    @property
    def decoded_bootstrap_servers(self) -> list[str]:
        """The decoded Kafka bootstrap URL."""
        return self.bootstrap_servers.split(",")

    @property
    def records(self) -> Iterator[KafkaEventRecord]:
        """The Kafka records."""
        for chunk in self["records"].values():
            for record in chunk:
                yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)

    @property
    def record(self) -> KafkaEventRecord:
        """
        Returns the next Kafka record using an iterator.

        Returns
        -------
        KafkaEventRecord
            The next Kafka record.

        Raises
        ------
        StopIteration
            If there are no more records available.

        """
        if self._records is None:
            self._records = self.records
        return next(self._records)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container
  • typing.Generic

Instance variables

prop bootstrap_servers : str

The Kafka bootstrap URL.

Expand source code
@property
def bootstrap_servers(self) -> str:
    """The Kafka bootstrap URL."""
    return self["bootstrapServers"]
prop decoded_bootstrap_servers : list[str]

The decoded Kafka bootstrap URL.

Expand source code
@property
def decoded_bootstrap_servers(self) -> list[str]:
    """The decoded Kafka bootstrap URL."""
    return self.bootstrap_servers.split(",")
prop event_source : str

The AWS service from which the Kafka event record originated.

Expand source code
@property
def event_source(self) -> str:
    """The AWS service from which the Kafka event record originated."""
    return self["eventSource"]
prop event_source_arn : str | None

The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.

Expand source code
@property
def event_source_arn(self) -> str | None:
    """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK."""
    return self.get("eventSourceArn")
prop recordKafkaEventRecord

Returns the next Kafka record using an iterator.

Returns

KafkaEventRecord
The next Kafka record.

Raises

StopIteration
If there are no more records available.
Expand source code
@property
def record(self) -> KafkaEventRecord:
    """
    Returns the next Kafka record using an iterator.

    Returns
    -------
    KafkaEventRecord
        The next Kafka record.

    Raises
    ------
    StopIteration
        If there are no more records available.

    """
    if self._records is None:
        self._records = self.records
    return next(self._records)
prop records : Iterator[KafkaEventRecord]

The Kafka records.

Expand source code
@property
def records(self) -> Iterator[KafkaEventRecord]:
    """The Kafka records."""
    for chunk in self["records"].values():
        for record in chunk:
            yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)

Inherited members

class KafkaEventRecord (data: dict[str, Any], json_deserializer: Callable | None = None)

Provides a single read only access to a wrapper dict

Parameters

data : dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KafkaEventRecord(DictWrapper):
    @property
    def topic(self) -> str:
        """The Kafka topic."""
        return self["topic"]

    @property
    def partition(self) -> int:
        """The Kafka record parition."""
        return self["partition"]

    @property
    def offset(self) -> int:
        """The Kafka record offset."""
        return self["offset"]

    @property
    def timestamp(self) -> int:
        """The Kafka record timestamp."""
        return self["timestamp"]

    @property
    def timestamp_type(self) -> str:
        """The Kafka record timestamp type."""
        return self["timestampType"]

    @property
    def key(self) -> str:
        """The raw (base64 encoded) Kafka record key."""
        return self["key"]

    @property
    def decoded_key(self) -> bytes:
        """Decode the base64 encoded key as bytes."""
        return base64.b64decode(self.key)

    @property
    def value(self) -> str:
        """The raw (base64 encoded) Kafka record value."""
        return self["value"]

    @property
    def decoded_value(self) -> bytes:
        """Decodes the base64 encoded value as bytes."""
        return base64.b64decode(self.value)

    @cached_property
    def json_value(self) -> Any:
        """Decodes the text encoded data as JSON."""
        return self._json_deserializer(self.decoded_value.decode("utf-8"))

    @property
    def headers(self) -> list[dict[str, list[int]]]:
        """The raw Kafka record headers."""
        return self["headers"]

    @cached_property
    def decoded_headers(self) -> dict[str, bytes]:
        """Decodes the headers as a single dictionary."""
        return CaseInsensitiveDict((k, bytes(v)) for chunk in self.headers for k, v in chunk.items())

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container
  • typing.Generic

Instance variables

var decoded_headers

Decodes the headers as a single dictionary.

Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        val = self.func(instance)
        try:
            cache[self.attrname] = val
        except TypeError:
            msg = (
                f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                f"does not support item assignment for caching {self.attrname!r} property."
            )
            raise TypeError(msg) from None
    return val
prop decoded_key : bytes

Decode the base64 encoded key as bytes.

Expand source code
@property
def decoded_key(self) -> bytes:
    """Decode the base64 encoded key as bytes."""
    return base64.b64decode(self.key)
prop decoded_value : bytes

Decodes the base64 encoded value as bytes.

Expand source code
@property
def decoded_value(self) -> bytes:
    """Decodes the base64 encoded value as bytes."""
    return base64.b64decode(self.value)
prop headers : list[dict[str, list[int]]]

The raw Kafka record headers.

Expand source code
@property
def headers(self) -> list[dict[str, list[int]]]:
    """The raw Kafka record headers."""
    return self["headers"]
var json_value

Decodes the text encoded data as JSON.

Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        val = self.func(instance)
        try:
            cache[self.attrname] = val
        except TypeError:
            msg = (
                f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                f"does not support item assignment for caching {self.attrname!r} property."
            )
            raise TypeError(msg) from None
    return val
prop key : str

The raw (base64 encoded) Kafka record key.

Expand source code
@property
def key(self) -> str:
    """The raw (base64 encoded) Kafka record key."""
    return self["key"]
prop offset : int

The Kafka record offset.

Expand source code
@property
def offset(self) -> int:
    """The Kafka record offset."""
    return self["offset"]
prop partition : int

The Kafka record parition.

Expand source code
@property
def partition(self) -> int:
    """The Kafka record parition."""
    return self["partition"]
prop timestamp : int

The Kafka record timestamp.

Expand source code
@property
def timestamp(self) -> int:
    """The Kafka record timestamp."""
    return self["timestamp"]
prop timestamp_type : str

The Kafka record timestamp type.

Expand source code
@property
def timestamp_type(self) -> str:
    """The Kafka record timestamp type."""
    return self["timestampType"]
prop topic : str

The Kafka topic.

Expand source code
@property
def topic(self) -> str:
    """The Kafka topic."""
    return self["topic"]
prop value : str

The raw (base64 encoded) Kafka record value.

Expand source code
@property
def value(self) -> str:
    """The raw (base64 encoded) Kafka record value."""
    return self["value"]

Inherited members