Module aws_lambda_powertools.utilities.data_classes.kafka_event
Classes
class KafkaEvent (data: dict[str, Any])
-
Self-managed or MSK Apache Kafka event trigger Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KafkaEvent(DictWrapper): """Self-managed or MSK Apache Kafka event trigger Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html """ def __init__(self, data: dict[str, Any]): super().__init__(data) self._records: Iterator[KafkaEventRecord] | None = None @property def event_source(self) -> str: """The AWS service from which the Kafka event record originated.""" return self["eventSource"] @property def event_source_arn(self) -> str | None: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn") @property def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" return self["bootstrapServers"] @property def decoded_bootstrap_servers(self) -> list[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",") @property def records(self) -> Iterator[KafkaEventRecord]: """The Kafka records.""" for chunk in self["records"].values(): for record in chunk: yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer) @property def record(self) -> KafkaEventRecord: """ Returns the next Kafka record using an iterator. Returns ------- KafkaEventRecord The next Kafka record. Raises ------ StopIteration If there are no more records available. """ if self._records is None: self._records = self.records return next(self._records)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop bootstrap_servers : str
-
The Kafka bootstrap URL.
Expand source code
@property def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" return self["bootstrapServers"]
prop decoded_bootstrap_servers : list[str]
-
The decoded Kafka bootstrap URL.
Expand source code
@property def decoded_bootstrap_servers(self) -> list[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",")
prop event_source : str
-
The AWS service from which the Kafka event record originated.
Expand source code
@property def event_source(self) -> str: """The AWS service from which the Kafka event record originated.""" return self["eventSource"]
prop event_source_arn : str | None
-
The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.
Expand source code
@property def event_source_arn(self) -> str | None: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn")
prop record : KafkaEventRecord
-
Returns the next Kafka record using an iterator.
Returns
KafkaEventRecord
- The next Kafka record.
Raises
StopIteration
- If there are no more records available.
Expand source code
@property def record(self) -> KafkaEventRecord: """ Returns the next Kafka record using an iterator. Returns ------- KafkaEventRecord The next Kafka record. Raises ------ StopIteration If there are no more records available. """ if self._records is None: self._records = self.records return next(self._records)
prop records : Iterator[KafkaEventRecord]
-
The Kafka records.
Expand source code
@property def records(self) -> Iterator[KafkaEventRecord]: """The Kafka records.""" for chunk in self["records"].values(): for record in chunk: yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)
Inherited members
class KafkaEventRecord (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KafkaEventRecord(DictWrapper): @property def topic(self) -> str: """The Kafka topic.""" return self["topic"] @property def partition(self) -> int: """The Kafka record parition.""" return self["partition"] @property def offset(self) -> int: """The Kafka record offset.""" return self["offset"] @property def timestamp(self) -> int: """The Kafka record timestamp.""" return self["timestamp"] @property def timestamp_type(self) -> str: """The Kafka record timestamp type.""" return self["timestampType"] @property def key(self) -> str: """The raw (base64 encoded) Kafka record key.""" return self["key"] @property def decoded_key(self) -> bytes: """Decode the base64 encoded key as bytes.""" return base64.b64decode(self.key) @property def value(self) -> str: """The raw (base64 encoded) Kafka record value.""" return self["value"] @property def decoded_value(self) -> bytes: """Decodes the base64 encoded value as bytes.""" return base64.b64decode(self.value) @cached_property def json_value(self) -> Any: """Decodes the text encoded data as JSON.""" return self._json_deserializer(self.decoded_value.decode("utf-8")) @property def headers(self) -> list[dict[str, list[int]]]: """The raw Kafka record headers.""" return self["headers"] @cached_property def decoded_headers(self) -> dict[str, bytes]: """Decodes the headers as a single dictionary.""" return CaseInsensitiveDict((k, bytes(v)) for chunk in self.headers for k, v in chunk.items())
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
var decoded_headers
-
Decodes the headers as a single dictionary.
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
prop decoded_key : bytes
-
Decode the base64 encoded key as bytes.
Expand source code
@property def decoded_key(self) -> bytes: """Decode the base64 encoded key as bytes.""" return base64.b64decode(self.key)
prop decoded_value : bytes
-
Decodes the base64 encoded value as bytes.
Expand source code
@property def decoded_value(self) -> bytes: """Decodes the base64 encoded value as bytes.""" return base64.b64decode(self.value)
prop headers : list[dict[str, list[int]]]
-
The raw Kafka record headers.
Expand source code
@property def headers(self) -> list[dict[str, list[int]]]: """The raw Kafka record headers.""" return self["headers"]
var json_value
-
Decodes the text encoded data as JSON.
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
prop key : str
-
The raw (base64 encoded) Kafka record key.
Expand source code
@property def key(self) -> str: """The raw (base64 encoded) Kafka record key.""" return self["key"]
prop offset : int
-
The Kafka record offset.
Expand source code
@property def offset(self) -> int: """The Kafka record offset.""" return self["offset"]
prop partition : int
-
The Kafka record parition.
Expand source code
@property def partition(self) -> int: """The Kafka record parition.""" return self["partition"]
prop timestamp : int
-
The Kafka record timestamp.
Expand source code
@property def timestamp(self) -> int: """The Kafka record timestamp.""" return self["timestamp"]
prop timestamp_type : str
-
The Kafka record timestamp type.
Expand source code
@property def timestamp_type(self) -> str: """The Kafka record timestamp type.""" return self["timestampType"]
prop topic : str
-
The Kafka topic.
Expand source code
@property def topic(self) -> str: """The Kafka topic.""" return self["topic"]
prop value : str
-
The raw (base64 encoded) Kafka record value.
Expand source code
@property def value(self) -> str: """The raw (base64 encoded) Kafka record value.""" return self["value"]
Inherited members