Module aws_lambda_powertools.utilities.data_classes.kinesis_firehose_event
Classes
class KinesisFirehoseDataTransformationRecord (record_id: str, result: "Literal['Ok', 'Dropped', 'ProcessingFailed']" = 'Ok', data: str = '', metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None, json_serializer: Callable = <function dumps>, json_deserializer: Callable = <function loads>)
-
Record in Kinesis Data Firehose response object.
Parameters
record_id
:str
- uniquely identifies this record within the current batch
result
:Literal["Ok", "Dropped", "ProcessingFailed"]
- record data transformation status, whether it succeeded, should be dropped, or failed.
data
:str
-
base64-encoded payload, by default empty string.
Use
data_from_text
ordata_from_json
methods to convert data if needed. metadata
:KinesisFirehoseDataTransformationRecordMetadata | None
-
Metadata associated with this record; can contain partition keys.
See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
json_serializer
:Callable
- function to serialize
obj
to a JSON formattedstr
, by default json.dumps json_deserializer
:Callable
- function to deserialize
str
,bytes
, bytearraycontaining a JSON document to a Python
obj`, by default json.loads
Documentation:
Expand source code
@dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationRecord: """Record in Kinesis Data Firehose response object. Parameters ---------- record_id: str uniquely identifies this record within the current batch result: Literal["Ok", "Dropped", "ProcessingFailed"] record data transformation status, whether it succeeded, should be dropped, or failed. data: str base64-encoded payload, by default empty string. Use `data_from_text` or `data_from_json` methods to convert data if needed. metadata: KinesisFirehoseDataTransformationRecordMetadata | None Metadata associated with this record; can contain partition keys. See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html json_serializer: Callable function to serialize `obj` to a JSON formatted `str`, by default json.dumps json_deserializer: Callable function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`, by default json.loads Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html """ _valid_result_types: ClassVar[tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed") record_id: str result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok" data: str = "" metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None json_serializer: Callable = json.dumps json_deserializer: Callable = json.loads def asdict(self) -> dict: if self.result not in self._valid_result_types: warnings.warn( stacklevel=1, message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"', ) record: dict[str, Any] = { "recordId": self.record_id, "result": self.result, "data": self.data, } if self.metadata: record["metadata"] = self.metadata.asdict() return record @property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" if not self.data: return b"" return base64.b64decode(self.data) @property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" if not self.data: return "" return self.data_as_bytes.decode("utf-8") @cached_property def data_as_json(self) -> dict: """Decoded base64-encoded data loaded to json""" if not self.data: return {} return self.json_deserializer(self.data_as_text)
Class variables
var data : str
var metadata : KinesisFirehoseDataTransformationRecordMetadata | None
var record_id : str
var result : Literal['Ok', 'Dropped', 'ProcessingFailed']
Instance variables
prop data_as_bytes : bytes
-
Decoded base64-encoded data as bytes
Expand source code
@property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" if not self.data: return b"" return base64.b64decode(self.data)
var data_as_json
-
Decoded base64-encoded data loaded to json
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
prop data_as_text : str
-
Decoded base64-encoded data as text
Expand source code
@property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" if not self.data: return "" return self.data_as_bytes.decode("utf-8")
Methods
def asdict(self) ‑> dict
def json_deserializer(s, *, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw)
-
Deserialize
s
(astr
,bytes
orbytearray
instance containing a JSON document) to a Python object.object_hook
is an optional function that will be called with the result of any object literal decode (adict
). The return value ofobject_hook
will be used instead of thedict
. This feature can be used to implement custom decoders (e.g. JSON-RPC class hinting).object_pairs_hook
is an optional function that will be called with the result of any object literal decoded with an ordered list of pairs. The return value ofobject_pairs_hook
will be used instead of thedict
. This feature can be used to implement custom decoders. Ifobject_hook
is also defined, theobject_pairs_hook
takes priority.parse_float
, if specified, will be called with the string of every JSON float to be decoded. By default this is equivalent to float(num_str). This can be used to use another datatype or parser for JSON floats (e.g. decimal.Decimal).parse_int
, if specified, will be called with the string of every JSON int to be decoded. By default this is equivalent to int(num_str). This can be used to use another datatype or parser for JSON integers (e.g. float).parse_constant
, if specified, will be called with one of the following strings: -Infinity, Infinity, NaN. This can be used to raise an exception if invalid JSON numbers are encountered.To use a custom
JSONDecoder
subclass, specify it with thecls
kwarg; otherwiseJSONDecoder
is used. def json_serializer(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=None, sort_keys=False, **kw)
-
Serialize
obj
to a JSON formattedstr
.If
skipkeys
is true thendict
keys that are not basic types (str
,int
,float
,bool
,None
) will be skipped instead of raising aTypeError
.If
ensure_ascii
is false, then the return value can contain non-ASCII characters if they appear in strings contained inobj
. Otherwise, all such characters are escaped in JSON strings.If
check_circular
is false, then the circular reference check for container types will be skipped and a circular reference will result in anRecursionError
(or worse).If
allow_nan
is false, then it will be aValueError
to serialize out of rangefloat
values (nan
,inf
,-inf
) in strict compliance of the JSON specification, instead of using the JavaScript equivalents (NaN
,Infinity
,-Infinity
).If
indent
is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines.None
is the most compact representation.If specified,
separators
should be an(item_separator, key_separator)
tuple. The default is(', ', ': ')
if indent isNone
and(',', ': ')
otherwise. To get the most compact JSON representation, you should specify(',', ':')
to eliminate whitespace.default(obj)
is a function that should return a serializable version of obj or raise TypeError. The default simply raises TypeError.If sort_keys is true (default:
False
), then the output of dictionaries will be sorted by key.To use a custom
JSONEncoder
subclass (e.g. one that overrides the.default()
method to serialize additional types), specify it with thecls
kwarg; otherwiseJSONEncoder
is used.
class KinesisFirehoseDataTransformationRecordMetadata (partition_keys: dict[str, str] = <factory>)
-
Metadata in Firehose Data Transform Record.
Parameters
partition_keys
:dict[str, str]
- A dict of partition keys/value in string format, e.g.
{"year":"2023","month":"09"}
Documentation:
Expand source code
@dataclass(repr=False, order=False, frozen=True) class KinesisFirehoseDataTransformationRecordMetadata: """ Metadata in Firehose Data Transform Record. Parameters ---------- partition_keys: dict[str, str] A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}` Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ partition_keys: dict[str, str] = field(default_factory=lambda: {}) def asdict(self) -> dict: if self.partition_keys is not None: return {"partitionKeys": self.partition_keys} return {}
Class variables
var partition_keys : dict[str, str]
Methods
def asdict(self) ‑> dict
class KinesisFirehoseDataTransformationResponse (records: list[KinesisFirehoseDataTransformationRecord] = <factory>)
-
Kinesis Data Firehose response object
Documentation:
Parameters
records
:list[KinesisFirehoseResponseRecord]
- records of Kinesis Data Firehose response object,
optional parameter at start. can be added later using
add_record
function.
Examples
Transforming data records
from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() for record in firehose_event.records: payload = record.data_as_text # base64 decoded data as str ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) result.add_record(processed_record) # return transformed records return result.asdict()
Expand source code
@dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationResponse: """Kinesis Data Firehose response object Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html Parameters ---------- records : list[KinesisFirehoseResponseRecord] records of Kinesis Data Firehose response object, optional parameter at start. can be added later using `add_record` function. Examples -------- **Transforming data records** ```python from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() for record in firehose_event.records: payload = record.data_as_text # base64 decoded data as str ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) result.add_record(processed_record) # return transformed records return result.asdict() ``` """ records: list[KinesisFirehoseDataTransformationRecord] = field(default_factory=list) def add_record(self, record: KinesisFirehoseDataTransformationRecord): self.records.append(record) def asdict(self) -> dict: if not self.records: raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response") return {"records": [record.asdict() for record in self.records]}
Class variables
var records : list[KinesisFirehoseDataTransformationRecord]
Methods
def add_record(self, record: KinesisFirehoseDataTransformationRecord)
def asdict(self) ‑> dict
class KinesisFirehoseEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Kinesis Data Firehose event
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisFirehoseEvent(DictWrapper): """Kinesis Data Firehose event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html """ @property def invocation_id(self) -> str: """Unique ID for for Lambda invocation""" return self["invocationId"] @property def delivery_stream_arn(self) -> str: """ARN of the Firehose Data Firehose Delivery Stream""" return self["deliveryStreamArn"] @property def source_kinesis_stream_arn(self) -> str | None: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn") @property def region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["region"] @property def records(self) -> Iterator[KinesisFirehoseRecord]: for record in self["records"]: yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop delivery_stream_arn : str
-
ARN of the Firehose Data Firehose Delivery Stream
Expand source code
@property def delivery_stream_arn(self) -> str: """ARN of the Firehose Data Firehose Delivery Stream""" return self["deliveryStreamArn"]
prop invocation_id : str
-
Unique ID for for Lambda invocation
Expand source code
@property def invocation_id(self) -> str: """Unique ID for for Lambda invocation""" return self["invocationId"]
prop records : Iterator[KinesisFirehoseRecord]
-
Expand source code
@property def records(self) -> Iterator[KinesisFirehoseRecord]: for record in self["records"]: yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
prop region : str
-
AWS region where the event originated eg: us-east-1
Expand source code
@property def region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["region"]
prop source_kinesis_stream_arn : str | None
-
ARN of the Kinesis Stream; present only when Kinesis Stream is source
Expand source code
@property def source_kinesis_stream_arn(self) -> str | None: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn")
Inherited members
class KinesisFirehoseRecord (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisFirehoseRecord(DictWrapper): @property def approximate_arrival_timestamp(self) -> int: """The approximate time that the record was inserted into the delivery stream""" return self["approximateArrivalTimestamp"] @property def record_id(self) -> str: """Record ID; uniquely identifies this record within the current batch""" return self["recordId"] @property def data(self) -> str: """The data blob, base64-encoded""" return self["data"] @property def metadata(self) -> KinesisFirehoseRecordMetadata | None: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None @property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" return base64.b64decode(self.data) @property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" return self.data_as_bytes.decode("utf-8") @cached_property def data_as_json(self) -> dict: """Decoded base64-encoded data loaded to json""" return self._json_deserializer(self.data_as_text) def build_data_transformation_response( self, result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok", data: str = "", metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None, ) -> KinesisFirehoseDataTransformationRecord: """Create a KinesisFirehoseResponseRecord directly using the record_id and given values Parameters ---------- result : Literal["Ok", "Dropped", "ProcessingFailed"] processing result, supported value: Ok, Dropped, ProcessingFailed data : str, optional data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or use either function like `data_from_text`, `data_from_json` to populate data metadata: KinesisFirehoseResponseRecordMetadata, optional Metadata associated with this record; can contain partition keys - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ return KinesisFirehoseDataTransformationRecord( record_id=self.record_id, result=result, data=data, metadata=metadata, )
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop approximate_arrival_timestamp : int
-
The approximate time that the record was inserted into the delivery stream
Expand source code
@property def approximate_arrival_timestamp(self) -> int: """The approximate time that the record was inserted into the delivery stream""" return self["approximateArrivalTimestamp"]
prop data : str
-
The data blob, base64-encoded
Expand source code
@property def data(self) -> str: """The data blob, base64-encoded""" return self["data"]
prop data_as_bytes : bytes
-
Decoded base64-encoded data as bytes
Expand source code
@property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" return base64.b64decode(self.data)
var data_as_json
-
Decoded base64-encoded data loaded to json
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
prop data_as_text : str
-
Decoded base64-encoded data as text
Expand source code
@property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" return self.data_as_bytes.decode("utf-8")
prop metadata : KinesisFirehoseRecordMetadata | None
-
Optional: metadata associated with this record; present only when Kinesis Stream is source
Expand source code
@property def metadata(self) -> KinesisFirehoseRecordMetadata | None: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None
prop record_id : str
-
Record ID; uniquely identifies this record within the current batch
Expand source code
@property def record_id(self) -> str: """Record ID; uniquely identifies this record within the current batch""" return self["recordId"]
Methods
def build_data_transformation_response(self, result: "Literal['Ok', 'Dropped', 'ProcessingFailed']" = 'Ok', data: str = '', metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None)
-
Create a KinesisFirehoseResponseRecord directly using the record_id and given values
Parameters
result
:Literal["Ok", "Dropped", "ProcessingFailed"]
- processing result, supported value: Ok, Dropped, ProcessingFailed
data
:str
, optional- data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
use either function like
data_from_text
,data_from_json
to populate data metadata
:KinesisFirehoseResponseRecordMetadata
, optional- Metadata associated with this record; can contain partition keys - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
Inherited members
class KinesisFirehoseRecordMetadata (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisFirehoseRecordMetadata(DictWrapper): @property def _metadata(self) -> dict: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return self["kinesisRecordMetadata"] # could raise KeyError @property def shard_id(self) -> str: """Kinesis stream shard ID; present only when Kinesis Stream is source""" return self._metadata["shardId"] @property def partition_key(self) -> str: """Kinesis stream partition key; present only when Kinesis Stream is source""" return self._metadata["partitionKey"] @property def approximate_arrival_timestamp(self) -> int: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" return self._metadata["approximateArrivalTimestamp"] @property def sequence_number(self) -> str: """Kinesis stream sequence number; present only when Kinesis Stream is source""" return self._metadata["sequenceNumber"] @property def subsequence_number(self) -> int: """Kinesis stream sub-sequence number; present only when Kinesis Stream is source Note: this will only be present for Kinesis streams using record aggregation """ return self._metadata["subsequenceNumber"]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop approximate_arrival_timestamp : int
-
Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source
Expand source code
@property def approximate_arrival_timestamp(self) -> int: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" return self._metadata["approximateArrivalTimestamp"]
prop partition_key : str
-
Kinesis stream partition key; present only when Kinesis Stream is source
Expand source code
@property def partition_key(self) -> str: """Kinesis stream partition key; present only when Kinesis Stream is source""" return self._metadata["partitionKey"]
prop sequence_number : str
-
Kinesis stream sequence number; present only when Kinesis Stream is source
Expand source code
@property def sequence_number(self) -> str: """Kinesis stream sequence number; present only when Kinesis Stream is source""" return self._metadata["sequenceNumber"]
prop shard_id : str
-
Kinesis stream shard ID; present only when Kinesis Stream is source
Expand source code
@property def shard_id(self) -> str: """Kinesis stream shard ID; present only when Kinesis Stream is source""" return self._metadata["shardId"]
prop subsequence_number : int
-
Kinesis stream sub-sequence number; present only when Kinesis Stream is source
Note: this will only be present for Kinesis streams using record aggregation
Expand source code
@property def subsequence_number(self) -> int: """Kinesis stream sub-sequence number; present only when Kinesis Stream is source Note: this will only be present for Kinesis streams using record aggregation """ return self._metadata["subsequenceNumber"]
Inherited members