Module aws_lambda_powertools.utilities.data_classes.kinesis_firehose_event
Expand source code
import base64
from typing import Iterator, Optional
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
class KinesisFirehoseRecordMetadata(DictWrapper):
@property
def _metadata(self) -> dict:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return self["kinesisRecordMetadata"] # could raise KeyError
@property
def shard_id(self) -> str:
"""Kinesis stream shard ID; present only when Kinesis Stream is source"""
return self._metadata["shardId"]
@property
def partition_key(self) -> str:
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
return self._metadata["partitionKey"]
@property
def approximate_arrival_timestamp(self) -> int:
"""Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
return self._metadata["approximateArrivalTimestamp"]
@property
def sequence_number(self) -> str:
"""Kinesis stream sequence number; present only when Kinesis Stream is source"""
return self._metadata["sequenceNumber"]
@property
def subsequence_number(self) -> str:
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source
Note: this will only be present for Kinesis streams using record aggregation
"""
return self._metadata["subsequenceNumber"]
class KinesisFirehoseRecord(DictWrapper):
@property
def approximate_arrival_timestamp(self) -> int:
"""The approximate time that the record was inserted into the delivery stream"""
return self["approximateArrivalTimestamp"]
@property
def record_id(self) -> str:
"""Record ID; uniquely identifies this record within the current batch"""
return self["recordId"]
@property
def data(self) -> str:
"""The data blob, base64-encoded"""
return self["data"]
@property
def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None
@property
def data_as_bytes(self) -> bytes:
"""Decoded base64-encoded data as bytes"""
return base64.b64decode(self.data)
@property
def data_as_text(self) -> str:
"""Decoded base64-encoded data as text"""
return self.data_as_bytes.decode("utf-8")
@property
def data_as_json(self) -> dict:
"""Decoded base64-encoded data loaded to json"""
if self._json_data is None:
self._json_data = self._json_deserializer(self.data_as_text)
return self._json_data
class KinesisFirehoseEvent(DictWrapper):
"""Kinesis Data Firehose event
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
"""
@property
def invocation_id(self) -> str:
"""Unique ID for for Lambda invocation"""
return self["invocationId"]
@property
def delivery_stream_arn(self) -> str:
"""ARN of the Firehose Data Firehose Delivery Stream"""
return self["deliveryStreamArn"]
@property
def source_kinesis_stream_arn(self) -> Optional[str]:
"""ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
return self.get("sourceKinesisStreamArn")
@property
def region(self) -> str:
"""AWS region where the event originated eg: us-east-1"""
return self["region"]
@property
def records(self) -> Iterator[KinesisFirehoseRecord]:
for record in self["records"]:
yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
Classes
class KinesisFirehoseEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Kinesis Data Firehose event
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisFirehoseEvent(DictWrapper): """Kinesis Data Firehose event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html """ @property def invocation_id(self) -> str: """Unique ID for for Lambda invocation""" return self["invocationId"] @property def delivery_stream_arn(self) -> str: """ARN of the Firehose Data Firehose Delivery Stream""" return self["deliveryStreamArn"] @property def source_kinesis_stream_arn(self) -> Optional[str]: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn") @property def region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["region"] @property def records(self) -> Iterator[KinesisFirehoseRecord]: for record in self["records"]: yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var delivery_stream_arn : str
-
ARN of the Firehose Data Firehose Delivery Stream
Expand source code
@property def delivery_stream_arn(self) -> str: """ARN of the Firehose Data Firehose Delivery Stream""" return self["deliveryStreamArn"]
var invocation_id : str
-
Unique ID for for Lambda invocation
Expand source code
@property def invocation_id(self) -> str: """Unique ID for for Lambda invocation""" return self["invocationId"]
var records : Iterator[KinesisFirehoseRecord]
-
Expand source code
@property def records(self) -> Iterator[KinesisFirehoseRecord]: for record in self["records"]: yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
var region : str
-
AWS region where the event originated eg: us-east-1
Expand source code
@property def region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["region"]
var source_kinesis_stream_arn : Optional[str]
-
ARN of the Kinesis Stream; present only when Kinesis Stream is source
Expand source code
@property def source_kinesis_stream_arn(self) -> Optional[str]: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn")
Inherited members
class KinesisFirehoseRecord (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisFirehoseRecord(DictWrapper): @property def approximate_arrival_timestamp(self) -> int: """The approximate time that the record was inserted into the delivery stream""" return self["approximateArrivalTimestamp"] @property def record_id(self) -> str: """Record ID; uniquely identifies this record within the current batch""" return self["recordId"] @property def data(self) -> str: """The data blob, base64-encoded""" return self["data"] @property def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None @property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" return base64.b64decode(self.data) @property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" return self.data_as_bytes.decode("utf-8") @property def data_as_json(self) -> dict: """Decoded base64-encoded data loaded to json""" if self._json_data is None: self._json_data = self._json_deserializer(self.data_as_text) return self._json_data
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var approximate_arrival_timestamp : int
-
The approximate time that the record was inserted into the delivery stream
Expand source code
@property def approximate_arrival_timestamp(self) -> int: """The approximate time that the record was inserted into the delivery stream""" return self["approximateArrivalTimestamp"]
var data : str
-
The data blob, base64-encoded
Expand source code
@property def data(self) -> str: """The data blob, base64-encoded""" return self["data"]
var data_as_bytes : bytes
-
Decoded base64-encoded data as bytes
Expand source code
@property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" return base64.b64decode(self.data)
var data_as_json : dict
-
Decoded base64-encoded data loaded to json
Expand source code
@property def data_as_json(self) -> dict: """Decoded base64-encoded data loaded to json""" if self._json_data is None: self._json_data = self._json_deserializer(self.data_as_text) return self._json_data
var data_as_text : str
-
Decoded base64-encoded data as text
Expand source code
@property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" return self.data_as_bytes.decode("utf-8")
var metadata : Optional[KinesisFirehoseRecordMetadata]
-
Optional: metadata associated with this record; present only when Kinesis Stream is source
Expand source code
@property def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None
var record_id : str
-
Record ID; uniquely identifies this record within the current batch
Expand source code
@property def record_id(self) -> str: """Record ID; uniquely identifies this record within the current batch""" return self["recordId"]
Inherited members
class KinesisFirehoseRecordMetadata (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisFirehoseRecordMetadata(DictWrapper): @property def _metadata(self) -> dict: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return self["kinesisRecordMetadata"] # could raise KeyError @property def shard_id(self) -> str: """Kinesis stream shard ID; present only when Kinesis Stream is source""" return self._metadata["shardId"] @property def partition_key(self) -> str: """Kinesis stream partition key; present only when Kinesis Stream is source""" return self._metadata["partitionKey"] @property def approximate_arrival_timestamp(self) -> int: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" return self._metadata["approximateArrivalTimestamp"] @property def sequence_number(self) -> str: """Kinesis stream sequence number; present only when Kinesis Stream is source""" return self._metadata["sequenceNumber"] @property def subsequence_number(self) -> str: """Kinesis stream sub-sequence number; present only when Kinesis Stream is source Note: this will only be present for Kinesis streams using record aggregation """ return self._metadata["subsequenceNumber"]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var approximate_arrival_timestamp : int
-
Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source
Expand source code
@property def approximate_arrival_timestamp(self) -> int: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" return self._metadata["approximateArrivalTimestamp"]
var partition_key : str
-
Kinesis stream partition key; present only when Kinesis Stream is source
Expand source code
@property def partition_key(self) -> str: """Kinesis stream partition key; present only when Kinesis Stream is source""" return self._metadata["partitionKey"]
var sequence_number : str
-
Kinesis stream sequence number; present only when Kinesis Stream is source
Expand source code
@property def sequence_number(self) -> str: """Kinesis stream sequence number; present only when Kinesis Stream is source""" return self._metadata["sequenceNumber"]
var shard_id : str
-
Kinesis stream shard ID; present only when Kinesis Stream is source
Expand source code
@property def shard_id(self) -> str: """Kinesis stream shard ID; present only when Kinesis Stream is source""" return self._metadata["shardId"]
var subsequence_number : str
-
Kinesis stream sub-sequence number; present only when Kinesis Stream is source
Note: this will only be present for Kinesis streams using record aggregation
Expand source code
@property def subsequence_number(self) -> str: """Kinesis stream sub-sequence number; present only when Kinesis Stream is source Note: this will only be present for Kinesis streams using record aggregation """ return self._metadata["subsequenceNumber"]
Inherited members