Module aws_lambda_powertools.utilities.data_classes.kafka_event
Expand source code
import base64
from typing import Any, Dict, Iterator, List, Optional
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.shared_functions import (
get_header_value,
)
class KafkaEventRecord(DictWrapper):
@property
def topic(self) -> str:
"""The Kafka topic."""
return self["topic"]
@property
def partition(self) -> str:
"""The Kafka record parition."""
return self["partition"]
@property
def offset(self) -> str:
"""The Kafka record offset."""
return self["offset"]
@property
def timestamp(self) -> int:
"""The Kafka record timestamp."""
return self["timestamp"]
@property
def timestamp_type(self) -> str:
"""The Kafka record timestamp type."""
return self["timestampType"]
@property
def key(self) -> str:
"""The raw (base64 encoded) Kafka record key."""
return self["key"]
@property
def decoded_key(self) -> bytes:
"""Decode the base64 encoded key as bytes."""
return base64.b64decode(self.key)
@property
def value(self) -> str:
"""The raw (base64 encoded) Kafka record value."""
return self["value"]
@property
def decoded_value(self) -> bytes:
"""Decodes the base64 encoded value as bytes."""
return base64.b64decode(self.value)
@property
def json_value(self) -> Any:
"""Decodes the text encoded data as JSON."""
if self._json_data is None:
self._json_data = self._json_deserializer(self.decoded_value.decode("utf-8"))
return self._json_data
@property
def headers(self) -> List[Dict[str, List[int]]]:
"""The raw Kafka record headers."""
return self["headers"]
@property
def decoded_headers(self) -> Dict[str, bytes]:
"""Decodes the headers as a single dictionary."""
return {k: bytes(v) for chunk in self.headers for k, v in chunk.items()}
def get_header_value(
self,
name: str,
default_value: Optional[Any] = None,
case_sensitive: bool = True,
) -> Optional[str]:
"""Get a decoded header value by name."""
return get_header_value(
headers=self.decoded_headers,
name=name,
default_value=default_value,
case_sensitive=case_sensitive,
)
class KafkaEvent(DictWrapper):
"""Self-managed or MSK Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
"""
def __init__(self, data: Dict[str, Any]):
super().__init__(data)
self._records: Optional[Iterator[KafkaEventRecord]] = None
@property
def event_source(self) -> str:
"""The AWS service from which the Kafka event record originated."""
return self["eventSource"]
@property
def event_source_arn(self) -> Optional[str]:
"""The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK."""
return self.get("eventSourceArn")
@property
def bootstrap_servers(self) -> str:
"""The Kafka bootstrap URL."""
return self["bootstrapServers"]
@property
def decoded_bootstrap_servers(self) -> List[str]:
"""The decoded Kafka bootstrap URL."""
return self.bootstrap_servers.split(",")
@property
def records(self) -> Iterator[KafkaEventRecord]:
"""The Kafka records."""
for chunk in self["records"].values():
for record in chunk:
yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)
@property
def record(self) -> KafkaEventRecord:
"""
Returns the next Kafka record using an iterator.
Returns
-------
KafkaEventRecord
The next Kafka record.
Raises
------
StopIteration
If there are no more records available.
"""
if self._records is None:
self._records = self.records
return next(self._records)
Classes
class KafkaEvent (data: Dict[str, Any])
-
Self-managed or MSK Apache Kafka event trigger Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KafkaEvent(DictWrapper): """Self-managed or MSK Apache Kafka event trigger Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html """ def __init__(self, data: Dict[str, Any]): super().__init__(data) self._records: Optional[Iterator[KafkaEventRecord]] = None @property def event_source(self) -> str: """The AWS service from which the Kafka event record originated.""" return self["eventSource"] @property def event_source_arn(self) -> Optional[str]: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn") @property def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" return self["bootstrapServers"] @property def decoded_bootstrap_servers(self) -> List[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",") @property def records(self) -> Iterator[KafkaEventRecord]: """The Kafka records.""" for chunk in self["records"].values(): for record in chunk: yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer) @property def record(self) -> KafkaEventRecord: """ Returns the next Kafka record using an iterator. Returns ------- KafkaEventRecord The next Kafka record. Raises ------ StopIteration If there are no more records available. """ if self._records is None: self._records = self.records return next(self._records)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var bootstrap_servers : str
-
The Kafka bootstrap URL.
Expand source code
@property def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" return self["bootstrapServers"]
var decoded_bootstrap_servers : List[str]
-
The decoded Kafka bootstrap URL.
Expand source code
@property def decoded_bootstrap_servers(self) -> List[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",")
var event_source : str
-
The AWS service from which the Kafka event record originated.
Expand source code
@property def event_source(self) -> str: """The AWS service from which the Kafka event record originated.""" return self["eventSource"]
var event_source_arn : Optional[str]
-
The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.
Expand source code
@property def event_source_arn(self) -> Optional[str]: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn")
var record : KafkaEventRecord
-
Returns the next Kafka record using an iterator.
Returns
KafkaEventRecord
- The next Kafka record.
Raises
StopIteration
- If there are no more records available.
Expand source code
@property def record(self) -> KafkaEventRecord: """ Returns the next Kafka record using an iterator. Returns ------- KafkaEventRecord The next Kafka record. Raises ------ StopIteration If there are no more records available. """ if self._records is None: self._records = self.records return next(self._records)
var records : Iterator[KafkaEventRecord]
-
The Kafka records.
Expand source code
@property def records(self) -> Iterator[KafkaEventRecord]: """The Kafka records.""" for chunk in self["records"].values(): for record in chunk: yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)
Inherited members
class KafkaEventRecord (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KafkaEventRecord(DictWrapper): @property def topic(self) -> str: """The Kafka topic.""" return self["topic"] @property def partition(self) -> str: """The Kafka record parition.""" return self["partition"] @property def offset(self) -> str: """The Kafka record offset.""" return self["offset"] @property def timestamp(self) -> int: """The Kafka record timestamp.""" return self["timestamp"] @property def timestamp_type(self) -> str: """The Kafka record timestamp type.""" return self["timestampType"] @property def key(self) -> str: """The raw (base64 encoded) Kafka record key.""" return self["key"] @property def decoded_key(self) -> bytes: """Decode the base64 encoded key as bytes.""" return base64.b64decode(self.key) @property def value(self) -> str: """The raw (base64 encoded) Kafka record value.""" return self["value"] @property def decoded_value(self) -> bytes: """Decodes the base64 encoded value as bytes.""" return base64.b64decode(self.value) @property def json_value(self) -> Any: """Decodes the text encoded data as JSON.""" if self._json_data is None: self._json_data = self._json_deserializer(self.decoded_value.decode("utf-8")) return self._json_data @property def headers(self) -> List[Dict[str, List[int]]]: """The raw Kafka record headers.""" return self["headers"] @property def decoded_headers(self) -> Dict[str, bytes]: """Decodes the headers as a single dictionary.""" return {k: bytes(v) for chunk in self.headers for k, v in chunk.items()} def get_header_value( self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True, ) -> Optional[str]: """Get a decoded header value by name.""" return get_header_value( headers=self.decoded_headers, name=name, default_value=default_value, case_sensitive=case_sensitive, )
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var decoded_headers : Dict[str, bytes]
-
Decodes the headers as a single dictionary.
Expand source code
@property def decoded_headers(self) -> Dict[str, bytes]: """Decodes the headers as a single dictionary.""" return {k: bytes(v) for chunk in self.headers for k, v in chunk.items()}
var decoded_key : bytes
-
Decode the base64 encoded key as bytes.
Expand source code
@property def decoded_key(self) -> bytes: """Decode the base64 encoded key as bytes.""" return base64.b64decode(self.key)
var decoded_value : bytes
-
Decodes the base64 encoded value as bytes.
Expand source code
@property def decoded_value(self) -> bytes: """Decodes the base64 encoded value as bytes.""" return base64.b64decode(self.value)
var headers : List[Dict[str, List[int]]]
-
The raw Kafka record headers.
Expand source code
@property def headers(self) -> List[Dict[str, List[int]]]: """The raw Kafka record headers.""" return self["headers"]
var json_value : Any
-
Decodes the text encoded data as JSON.
Expand source code
@property def json_value(self) -> Any: """Decodes the text encoded data as JSON.""" if self._json_data is None: self._json_data = self._json_deserializer(self.decoded_value.decode("utf-8")) return self._json_data
var key : str
-
The raw (base64 encoded) Kafka record key.
Expand source code
@property def key(self) -> str: """The raw (base64 encoded) Kafka record key.""" return self["key"]
var offset : str
-
The Kafka record offset.
Expand source code
@property def offset(self) -> str: """The Kafka record offset.""" return self["offset"]
var partition : str
-
The Kafka record parition.
Expand source code
@property def partition(self) -> str: """The Kafka record parition.""" return self["partition"]
var timestamp : int
-
The Kafka record timestamp.
Expand source code
@property def timestamp(self) -> int: """The Kafka record timestamp.""" return self["timestamp"]
var timestamp_type : str
-
The Kafka record timestamp type.
Expand source code
@property def timestamp_type(self) -> str: """The Kafka record timestamp type.""" return self["timestampType"]
var topic : str
-
The Kafka topic.
Expand source code
@property def topic(self) -> str: """The Kafka topic.""" return self["topic"]
var value : str
-
The raw (base64 encoded) Kafka record value.
Expand source code
@property def value(self) -> str: """The raw (base64 encoded) Kafka record value.""" return self["value"]
Methods
def get_header_value(self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True) ‑> Optional[str]
-
Get a decoded header value by name.
Expand source code
def get_header_value( self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True, ) -> Optional[str]: """Get a decoded header value by name.""" return get_header_value( headers=self.decoded_headers, name=name, default_value=default_value, case_sensitive=case_sensitive, )
Inherited members