Module aws_lambda_powertools.utilities.data_classes.kafka_event
Expand source code
import base64
from typing import Any, Dict, Iterator, List, Optional
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
class KafkaEventRecord(DictWrapper):
@property
def topic(self) -> str:
"""The Kafka topic."""
return self["topic"]
@property
def partition(self) -> str:
"""The Kafka record parition."""
return self["partition"]
@property
def offset(self) -> str:
"""The Kafka record offset."""
return self["offset"]
@property
def timestamp(self) -> int:
"""The Kafka record timestamp."""
return self["timestamp"]
@property
def timestamp_type(self) -> str:
"""The Kafka record timestamp type."""
return self["timestampType"]
@property
def key(self) -> str:
"""The raw (base64 encoded) Kafka record key."""
return self["key"]
@property
def decoded_key(self) -> bytes:
"""Decode the base64 encoded key as bytes."""
return base64.b64decode(self.key)
@property
def value(self) -> str:
"""The raw (base64 encoded) Kafka record value."""
return self["value"]
@property
def decoded_value(self) -> bytes:
"""Decodes the base64 encoded value as bytes."""
return base64.b64decode(self.value)
@property
def json_value(self) -> Any:
"""Decodes the text encoded data as JSON."""
if self._json_data is None:
self._json_data = self._json_deserializer(self.decoded_value.decode("utf-8"))
return self._json_data
@property
def headers(self) -> List[Dict[str, List[int]]]:
"""The raw Kafka record headers."""
return self["headers"]
@property
def decoded_headers(self) -> Dict[str, bytes]:
"""Decodes the headers as a single dictionary."""
return {k: bytes(v) for chunk in self.headers for k, v in chunk.items()}
def get_header_value(
self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True
) -> Optional[bytes]:
"""Get a decoded header value by name."""
if case_sensitive:
return self.decoded_headers.get(name, default_value)
name_lower = name.lower()
return next(
# Iterate over the dict and do a case-insensitive key comparison
(value for key, value in self.decoded_headers.items() if key.lower() == name_lower),
# Default value is returned if no matches was found
default_value,
)
class KafkaEvent(DictWrapper):
"""Self-managed or MSK Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
"""
@property
def event_source(self) -> str:
"""The AWS service from which the Kafka event record originated."""
return self["eventSource"]
@property
def event_source_arn(self) -> Optional[str]:
"""The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK."""
return self.get("eventSourceArn")
@property
def bootstrap_servers(self) -> str:
"""The Kafka bootstrap URL."""
return self["bootstrapServers"]
@property
def decoded_bootstrap_servers(self) -> List[str]:
"""The decoded Kafka bootstrap URL."""
return self.bootstrap_servers.split(",")
@property
def records(self) -> Iterator[KafkaEventRecord]:
"""The Kafka records."""
for chunk in self["records"].values():
for record in chunk:
yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)
@property
def record(self) -> KafkaEventRecord:
"""The next Kafka record."""
return next(self.records)
Classes
class KafkaEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Self-managed or MSK Apache Kafka event trigger Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KafkaEvent(DictWrapper): """Self-managed or MSK Apache Kafka event trigger Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html """ @property def event_source(self) -> str: """The AWS service from which the Kafka event record originated.""" return self["eventSource"] @property def event_source_arn(self) -> Optional[str]: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn") @property def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" return self["bootstrapServers"] @property def decoded_bootstrap_servers(self) -> List[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",") @property def records(self) -> Iterator[KafkaEventRecord]: """The Kafka records.""" for chunk in self["records"].values(): for record in chunk: yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer) @property def record(self) -> KafkaEventRecord: """The next Kafka record.""" return next(self.records)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var bootstrap_servers : str
-
The Kafka bootstrap URL.
Expand source code
@property def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" return self["bootstrapServers"]
var decoded_bootstrap_servers : List[str]
-
The decoded Kafka bootstrap URL.
Expand source code
@property def decoded_bootstrap_servers(self) -> List[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",")
var event_source : str
-
The AWS service from which the Kafka event record originated.
Expand source code
@property def event_source(self) -> str: """The AWS service from which the Kafka event record originated.""" return self["eventSource"]
var event_source_arn : Optional[str]
-
The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.
Expand source code
@property def event_source_arn(self) -> Optional[str]: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn")
var record : KafkaEventRecord
-
The next Kafka record.
Expand source code
@property def record(self) -> KafkaEventRecord: """The next Kafka record.""" return next(self.records)
var records : Iterator[KafkaEventRecord]
-
The Kafka records.
Expand source code
@property def records(self) -> Iterator[KafkaEventRecord]: """The Kafka records.""" for chunk in self["records"].values(): for record in chunk: yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)
Inherited members
class KafkaEventRecord (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KafkaEventRecord(DictWrapper): @property def topic(self) -> str: """The Kafka topic.""" return self["topic"] @property def partition(self) -> str: """The Kafka record parition.""" return self["partition"] @property def offset(self) -> str: """The Kafka record offset.""" return self["offset"] @property def timestamp(self) -> int: """The Kafka record timestamp.""" return self["timestamp"] @property def timestamp_type(self) -> str: """The Kafka record timestamp type.""" return self["timestampType"] @property def key(self) -> str: """The raw (base64 encoded) Kafka record key.""" return self["key"] @property def decoded_key(self) -> bytes: """Decode the base64 encoded key as bytes.""" return base64.b64decode(self.key) @property def value(self) -> str: """The raw (base64 encoded) Kafka record value.""" return self["value"] @property def decoded_value(self) -> bytes: """Decodes the base64 encoded value as bytes.""" return base64.b64decode(self.value) @property def json_value(self) -> Any: """Decodes the text encoded data as JSON.""" if self._json_data is None: self._json_data = self._json_deserializer(self.decoded_value.decode("utf-8")) return self._json_data @property def headers(self) -> List[Dict[str, List[int]]]: """The raw Kafka record headers.""" return self["headers"] @property def decoded_headers(self) -> Dict[str, bytes]: """Decodes the headers as a single dictionary.""" return {k: bytes(v) for chunk in self.headers for k, v in chunk.items()} def get_header_value( self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True ) -> Optional[bytes]: """Get a decoded header value by name.""" if case_sensitive: return self.decoded_headers.get(name, default_value) name_lower = name.lower() return next( # Iterate over the dict and do a case-insensitive key comparison (value for key, value in self.decoded_headers.items() if key.lower() == name_lower), # Default value is returned if no matches was found default_value, )
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var decoded_headers : Dict[str, bytes]
-
Decodes the headers as a single dictionary.
Expand source code
@property def decoded_headers(self) -> Dict[str, bytes]: """Decodes the headers as a single dictionary.""" return {k: bytes(v) for chunk in self.headers for k, v in chunk.items()}
var decoded_key : bytes
-
Decode the base64 encoded key as bytes.
Expand source code
@property def decoded_key(self) -> bytes: """Decode the base64 encoded key as bytes.""" return base64.b64decode(self.key)
var decoded_value : bytes
-
Decodes the base64 encoded value as bytes.
Expand source code
@property def decoded_value(self) -> bytes: """Decodes the base64 encoded value as bytes.""" return base64.b64decode(self.value)
var headers : List[Dict[str, List[int]]]
-
The raw Kafka record headers.
Expand source code
@property def headers(self) -> List[Dict[str, List[int]]]: """The raw Kafka record headers.""" return self["headers"]
var json_value : Any
-
Decodes the text encoded data as JSON.
Expand source code
@property def json_value(self) -> Any: """Decodes the text encoded data as JSON.""" if self._json_data is None: self._json_data = self._json_deserializer(self.decoded_value.decode("utf-8")) return self._json_data
var key : str
-
The raw (base64 encoded) Kafka record key.
Expand source code
@property def key(self) -> str: """The raw (base64 encoded) Kafka record key.""" return self["key"]
var offset : str
-
The Kafka record offset.
Expand source code
@property def offset(self) -> str: """The Kafka record offset.""" return self["offset"]
var partition : str
-
The Kafka record parition.
Expand source code
@property def partition(self) -> str: """The Kafka record parition.""" return self["partition"]
var timestamp : int
-
The Kafka record timestamp.
Expand source code
@property def timestamp(self) -> int: """The Kafka record timestamp.""" return self["timestamp"]
var timestamp_type : str
-
The Kafka record timestamp type.
Expand source code
@property def timestamp_type(self) -> str: """The Kafka record timestamp type.""" return self["timestampType"]
var topic : str
-
The Kafka topic.
Expand source code
@property def topic(self) -> str: """The Kafka topic.""" return self["topic"]
var value : str
-
The raw (base64 encoded) Kafka record value.
Expand source code
@property def value(self) -> str: """The raw (base64 encoded) Kafka record value.""" return self["value"]
Methods
def get_header_value(self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True) ‑> Optional[bytes]
-
Get a decoded header value by name.
Expand source code
def get_header_value( self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True ) -> Optional[bytes]: """Get a decoded header value by name.""" if case_sensitive: return self.decoded_headers.get(name, default_value) name_lower = name.lower() return next( # Iterate over the dict and do a case-insensitive key comparison (value for key, value in self.decoded_headers.items() if key.lower() == name_lower), # Default value is returned if no matches was found default_value, )
Inherited members