Module aws_lambda_powertools.utilities.data_classes.kinesis_firehose_event

Classes

class KinesisFirehoseDataTransformationRecord (record_id: str, result: Literal['Ok', 'Dropped', 'ProcessingFailed'] = 'Ok', data: str = '', metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None, json_serializer: Callable = <function dumps>, json_deserializer: Callable = <function loads>)

Record in Kinesis Data Firehose response object.

Parameters

record_id : str
uniquely identifies this record within the current batch
result : Literal["Ok", "Dropped", "ProcessingFailed"]
record data transformation status, whether it succeeded, should be dropped, or failed.
data : str

base64-encoded payload, by default empty string.

Use data_from_text or data_from_json methods to convert data if needed.

metadata : Optional[KinesisFirehoseDataTransformationRecordMetadata]

Metadata associated with this record; can contain partition keys.

See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html

json_serializer : Callable
function to serialize obj to a JSON formatted str, by default json.dumps
json_deserializer : Callable
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj`, by default json.loads

Documentation:

Expand source code
@dataclass(repr=False, order=False)
class KinesisFirehoseDataTransformationRecord:
    """Record in Kinesis Data Firehose response object.

    Parameters
    ----------
    record_id: str
        uniquely identifies this record within the current batch
    result: Literal["Ok", "Dropped", "ProcessingFailed"]
        record data transformation status, whether it succeeded, should be dropped, or failed.
    data: str
        base64-encoded payload, by default empty string.

        Use `data_from_text` or `data_from_json` methods to convert data if needed.

    metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata]
        Metadata associated with this record; can contain partition keys.

        See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
    json_serializer: Callable
        function to serialize `obj` to a JSON formatted `str`, by default json.dumps
    json_deserializer: Callable
        function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
        by default json.loads

    Documentation:
    --------------
    - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
    """

    _valid_result_types: ClassVar[Tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed")

    record_id: str
    result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok"
    data: str = ""
    metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None
    json_serializer: Callable = json.dumps
    json_deserializer: Callable = json.loads

    def asdict(self) -> Dict:
        if self.result not in self._valid_result_types:
            warnings.warn(
                stacklevel=1,
                message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"',
            )

        record: Dict[str, Any] = {
            "recordId": self.record_id,
            "result": self.result,
            "data": self.data,
        }
        if self.metadata:
            record["metadata"] = self.metadata.asdict()
        return record

    @property
    def data_as_bytes(self) -> bytes:
        """Decoded base64-encoded data as bytes"""
        if not self.data:
            return b""
        return base64.b64decode(self.data)

    @property
    def data_as_text(self) -> str:
        """Decoded base64-encoded data as text"""
        if not self.data:
            return ""
        return self.data_as_bytes.decode("utf-8")

    @cached_property
    def data_as_json(self) -> Dict:
        """Decoded base64-encoded data loaded to json"""
        if not self.data:
            return {}

        return self.json_deserializer(self.data_as_text)

Class variables

var data : str
var metadata : Optional[KinesisFirehoseDataTransformationRecordMetadata]
var record_id : str
var result : Literal['Ok', 'Dropped', 'ProcessingFailed']

Instance variables

prop data_as_bytes : bytes

Decoded base64-encoded data as bytes

Expand source code
@property
def data_as_bytes(self) -> bytes:
    """Decoded base64-encoded data as bytes"""
    if not self.data:
        return b""
    return base64.b64decode(self.data)
var data_as_json

Decoded base64-encoded data loaded to json

Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        val = self.func(instance)
        try:
            cache[self.attrname] = val
        except TypeError:
            msg = (
                f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                f"does not support item assignment for caching {self.attrname!r} property."
            )
            raise TypeError(msg) from None
    return val
prop data_as_text : str

Decoded base64-encoded data as text

Expand source code
@property
def data_as_text(self) -> str:
    """Decoded base64-encoded data as text"""
    if not self.data:
        return ""
    return self.data_as_bytes.decode("utf-8")

Methods

def asdict(self) ‑> Dict
def json_deserializer(s, *, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw) ‑> Callable

Deserialize s (a str, bytes or bytearray instance containing a JSON document) to a Python object.

object_hook is an optional function that will be called with the result of any object literal decode (a dict). The return value of object_hook will be used instead of the dict. This feature can be used to implement custom decoders (e.g. JSON-RPC class hinting).

object_pairs_hook is an optional function that will be called with the result of any object literal decoded with an ordered list of pairs. The return value of object_pairs_hook will be used instead of the dict. This feature can be used to implement custom decoders. If object_hook is also defined, the object_pairs_hook takes priority.

parse_float, if specified, will be called with the string of every JSON float to be decoded. By default this is equivalent to float(num_str). This can be used to use another datatype or parser for JSON floats (e.g. decimal.Decimal).

parse_int, if specified, will be called with the string of every JSON int to be decoded. By default this is equivalent to int(num_str). This can be used to use another datatype or parser for JSON integers (e.g. float).

parse_constant, if specified, will be called with one of the following strings: -Infinity, Infinity, NaN. This can be used to raise an exception if invalid JSON numbers are encountered.

To use a custom JSONDecoder subclass, specify it with the cls kwarg; otherwise JSONDecoder is used.

def json_serializer(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=None, sort_keys=False, **kw) ‑> Callable

Serialize obj to a JSON formatted str.

If skipkeys is true then dict keys that are not basic types (str, int, float, bool, None) will be skipped instead of raising a TypeError.

If ensure_ascii is false, then the return value can contain non-ASCII characters if they appear in strings contained in obj. Otherwise, all such characters are escaped in JSON strings.

If check_circular is false, then the circular reference check for container types will be skipped and a circular reference will result in an RecursionError (or worse).

If allow_nan is false, then it will be a ValueError to serialize out of range float values (nan, inf, -inf) in strict compliance of the JSON specification, instead of using the JavaScript equivalents (NaN, Infinity, -Infinity).

If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.

If specified, separators should be an (item_separator, key_separator) tuple. The default is (', ', ': ') if indent is None and (',', ': ') otherwise. To get the most compact JSON representation, you should specify (',', ':') to eliminate whitespace.

default(obj) is a function that should return a serializable version of obj or raise TypeError. The default simply raises TypeError.

If sort_keys is true (default: False), then the output of dictionaries will be sorted by key.

To use a custom JSONEncoder subclass (e.g. one that overrides the .default() method to serialize additional types), specify it with the cls kwarg; otherwise JSONEncoder is used.

class KinesisFirehoseDataTransformationRecordMetadata (partition_keys: Dict[str, str] = <factory>)

Metadata in Firehose Data Transform Record.

Parameters

partition_keys : Dict[str, str]
A dict of partition keys/value in string format, e.g. {"year":"2023","month":"09"}

Documentation:

Expand source code
@dataclass(repr=False, order=False, frozen=True)
class KinesisFirehoseDataTransformationRecordMetadata:
    """
    Metadata in Firehose Data Transform Record.

    Parameters
    ----------
    partition_keys: Dict[str, str]
        A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}`

    Documentation:
    --------------
    - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
    """

    partition_keys: Dict[str, str] = field(default_factory=lambda: {})

    def asdict(self) -> Dict:
        if self.partition_keys is not None:
            return {"partitionKeys": self.partition_keys}
        return {}

Class variables

var partition_keys : Dict[str, str]

Methods

def asdict(self) ‑> Dict
class KinesisFirehoseDataTransformationResponse (records: List[KinesisFirehoseDataTransformationRecord] = <factory>)

Kinesis Data Firehose response object

Documentation:

Parameters

records : List[KinesisFirehoseResponseRecord]
records of Kinesis Data Firehose response object, optional parameter at start. can be added later using add_record function.

Examples

Transforming data records

from aws_lambda_powertools.utilities.data_classes import (
    KinesisFirehoseDataTransformationRecord,
    KinesisFirehoseDataTransformationResponse,
    KinesisFirehoseEvent,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext


def lambda_handler(event: dict, context: LambdaContext):
    firehose_event = KinesisFirehoseEvent(event)
    result = KinesisFirehoseDataTransformationResponse()

    for record in firehose_event.records:
        payload = record.data_as_text  # base64 decoded data as str

        ## generate data to return
        transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
        processed_record = KinesisFirehoseDataTransformationRecord(
            record_id=record.record_id,
            data=base64_from_json(transformed_data),
        )

        result.add_record(processed_record)

    # return transformed records
    return result.asdict()
Expand source code
@dataclass(repr=False, order=False)
class KinesisFirehoseDataTransformationResponse:
    """Kinesis Data Firehose response object

    Documentation:
    --------------
    - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

    Parameters
    ----------
    records : List[KinesisFirehoseResponseRecord]
        records of Kinesis Data Firehose response object,
        optional parameter at start. can be added later using `add_record` function.

    Examples
    --------

    **Transforming data records**

    ```python
    from aws_lambda_powertools.utilities.data_classes import (
        KinesisFirehoseDataTransformationRecord,
        KinesisFirehoseDataTransformationResponse,
        KinesisFirehoseEvent,
    )
    from aws_lambda_powertools.utilities.serialization import base64_from_json
    from aws_lambda_powertools.utilities.typing import LambdaContext


    def lambda_handler(event: dict, context: LambdaContext):
        firehose_event = KinesisFirehoseEvent(event)
        result = KinesisFirehoseDataTransformationResponse()

        for record in firehose_event.records:
            payload = record.data_as_text  # base64 decoded data as str

            ## generate data to return
            transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=base64_from_json(transformed_data),
            )

            result.add_record(processed_record)

        # return transformed records
        return result.asdict()
    ```
    """

    records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list)

    def add_record(self, record: KinesisFirehoseDataTransformationRecord):
        self.records.append(record)

    def asdict(self) -> Dict:
        if not self.records:
            raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response")

        return {"records": [record.asdict() for record in self.records]}

Class variables

var records : List[KinesisFirehoseDataTransformationRecord]

Methods

def add_record(self, record: KinesisFirehoseDataTransformationRecord)
def asdict(self) ‑> Dict
class KinesisFirehoseEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Kinesis Data Firehose event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisFirehoseEvent(DictWrapper):
    """Kinesis Data Firehose event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
    """

    @property
    def invocation_id(self) -> str:
        """Unique ID for for Lambda invocation"""
        return self["invocationId"]

    @property
    def delivery_stream_arn(self) -> str:
        """ARN of the Firehose Data Firehose Delivery Stream"""
        return self["deliveryStreamArn"]

    @property
    def source_kinesis_stream_arn(self) -> Optional[str]:
        """ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
        return self.get("sourceKinesisStreamArn")

    @property
    def region(self) -> str:
        """AWS region where the event originated eg: us-east-1"""
        return self["region"]

    @property
    def records(self) -> Iterator[KinesisFirehoseRecord]:
        for record in self["records"]:
            yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

prop delivery_stream_arn : str

ARN of the Firehose Data Firehose Delivery Stream

Expand source code
@property
def delivery_stream_arn(self) -> str:
    """ARN of the Firehose Data Firehose Delivery Stream"""
    return self["deliveryStreamArn"]
prop invocation_id : str

Unique ID for for Lambda invocation

Expand source code
@property
def invocation_id(self) -> str:
    """Unique ID for for Lambda invocation"""
    return self["invocationId"]
prop records : Iterator[KinesisFirehoseRecord]
Expand source code
@property
def records(self) -> Iterator[KinesisFirehoseRecord]:
    for record in self["records"]:
        yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
prop region : str

AWS region where the event originated eg: us-east-1

Expand source code
@property
def region(self) -> str:
    """AWS region where the event originated eg: us-east-1"""
    return self["region"]
prop source_kinesis_stream_arn : Optional[str]

ARN of the Kinesis Stream; present only when Kinesis Stream is source

Expand source code
@property
def source_kinesis_stream_arn(self) -> Optional[str]:
    """ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
    return self.get("sourceKinesisStreamArn")

Inherited members

class KinesisFirehoseRecord (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisFirehoseRecord(DictWrapper):
    @property
    def approximate_arrival_timestamp(self) -> int:
        """The approximate time that the record was inserted into the delivery stream"""
        return self["approximateArrivalTimestamp"]

    @property
    def record_id(self) -> str:
        """Record ID; uniquely identifies this record within the current batch"""
        return self["recordId"]

    @property
    def data(self) -> str:
        """The data blob, base64-encoded"""
        return self["data"]

    @property
    def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]:
        """Optional: metadata associated with this record; present only when Kinesis Stream is source"""
        return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None

    @property
    def data_as_bytes(self) -> bytes:
        """Decoded base64-encoded data as bytes"""
        return base64.b64decode(self.data)

    @property
    def data_as_text(self) -> str:
        """Decoded base64-encoded data as text"""
        return self.data_as_bytes.decode("utf-8")

    @cached_property
    def data_as_json(self) -> dict:
        """Decoded base64-encoded data loaded to json"""
        return self._json_deserializer(self.data_as_text)

    def build_data_transformation_response(
        self,
        result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok",
        data: str = "",
        metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None,
    ) -> KinesisFirehoseDataTransformationRecord:
        """Create a KinesisFirehoseResponseRecord directly using the record_id and given values

        Parameters
        ----------
        result : Literal["Ok", "Dropped", "ProcessingFailed"]
            processing result, supported value: Ok, Dropped, ProcessingFailed
        data : str, optional
            data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
            use either function like `data_from_text`, `data_from_json` to populate data
        metadata: KinesisFirehoseResponseRecordMetadata, optional
            Metadata associated with this record; can contain partition keys
            - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
        """
        return KinesisFirehoseDataTransformationRecord(
            record_id=self.record_id,
            result=result,
            data=data,
            metadata=metadata,
        )

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

prop approximate_arrival_timestamp : int

The approximate time that the record was inserted into the delivery stream

Expand source code
@property
def approximate_arrival_timestamp(self) -> int:
    """The approximate time that the record was inserted into the delivery stream"""
    return self["approximateArrivalTimestamp"]
prop data : str

The data blob, base64-encoded

Expand source code
@property
def data(self) -> str:
    """The data blob, base64-encoded"""
    return self["data"]
prop data_as_bytes : bytes

Decoded base64-encoded data as bytes

Expand source code
@property
def data_as_bytes(self) -> bytes:
    """Decoded base64-encoded data as bytes"""
    return base64.b64decode(self.data)
var data_as_json

Decoded base64-encoded data loaded to json

Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        val = self.func(instance)
        try:
            cache[self.attrname] = val
        except TypeError:
            msg = (
                f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                f"does not support item assignment for caching {self.attrname!r} property."
            )
            raise TypeError(msg) from None
    return val
prop data_as_text : str

Decoded base64-encoded data as text

Expand source code
@property
def data_as_text(self) -> str:
    """Decoded base64-encoded data as text"""
    return self.data_as_bytes.decode("utf-8")
prop metadata : Optional[KinesisFirehoseRecordMetadata]

Optional: metadata associated with this record; present only when Kinesis Stream is source

Expand source code
@property
def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]:
    """Optional: metadata associated with this record; present only when Kinesis Stream is source"""
    return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None
prop record_id : str

Record ID; uniquely identifies this record within the current batch

Expand source code
@property
def record_id(self) -> str:
    """Record ID; uniquely identifies this record within the current batch"""
    return self["recordId"]

Methods

def build_data_transformation_response(self, result: Literal['Ok', 'Dropped', 'ProcessingFailed'] = 'Ok', data: str = '', metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None) ‑> KinesisFirehoseDataTransformationRecord

Create a KinesisFirehoseResponseRecord directly using the record_id and given values

Parameters

result : Literal["Ok", "Dropped", "ProcessingFailed"]
processing result, supported value: Ok, Dropped, ProcessingFailed
data : str, optional
data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or use either function like data_from_text, data_from_json to populate data
metadata : KinesisFirehoseResponseRecordMetadata, optional
Metadata associated with this record; can contain partition keys - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html

Inherited members

class KinesisFirehoseRecordMetadata (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisFirehoseRecordMetadata(DictWrapper):
    @property
    def _metadata(self) -> dict:
        """Optional: metadata associated with this record; present only when Kinesis Stream is source"""
        return self["kinesisRecordMetadata"]  # could raise KeyError

    @property
    def shard_id(self) -> str:
        """Kinesis stream shard ID; present only when Kinesis Stream is source"""
        return self._metadata["shardId"]

    @property
    def partition_key(self) -> str:
        """Kinesis stream partition key; present only when Kinesis Stream is source"""
        return self._metadata["partitionKey"]

    @property
    def approximate_arrival_timestamp(self) -> int:
        """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
        return self._metadata["approximateArrivalTimestamp"]

    @property
    def sequence_number(self) -> str:
        """Kinesis stream sequence number; present only when Kinesis Stream is source"""
        return self._metadata["sequenceNumber"]

    @property
    def subsequence_number(self) -> int:
        """Kinesis stream sub-sequence number; present only when Kinesis Stream is source

        Note: this will only be present for Kinesis streams using record aggregation
        """
        return self._metadata["subsequenceNumber"]

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

prop approximate_arrival_timestamp : int

Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source

Expand source code
@property
def approximate_arrival_timestamp(self) -> int:
    """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
    return self._metadata["approximateArrivalTimestamp"]
prop partition_key : str

Kinesis stream partition key; present only when Kinesis Stream is source

Expand source code
@property
def partition_key(self) -> str:
    """Kinesis stream partition key; present only when Kinesis Stream is source"""
    return self._metadata["partitionKey"]
prop sequence_number : str

Kinesis stream sequence number; present only when Kinesis Stream is source

Expand source code
@property
def sequence_number(self) -> str:
    """Kinesis stream sequence number; present only when Kinesis Stream is source"""
    return self._metadata["sequenceNumber"]
prop shard_id : str

Kinesis stream shard ID; present only when Kinesis Stream is source

Expand source code
@property
def shard_id(self) -> str:
    """Kinesis stream shard ID; present only when Kinesis Stream is source"""
    return self._metadata["shardId"]
prop subsequence_number : int

Kinesis stream sub-sequence number; present only when Kinesis Stream is source

Note: this will only be present for Kinesis streams using record aggregation

Expand source code
@property
def subsequence_number(self) -> int:
    """Kinesis stream sub-sequence number; present only when Kinesis Stream is source

    Note: this will only be present for Kinesis streams using record aggregation
    """
    return self._metadata["subsequenceNumber"]

Inherited members