Module aws_lambda_powertools.utilities.data_classes.kinesis_firehose_event

Expand source code
import base64
import json
import warnings
from dataclasses import dataclass, field
from typing import Any, Callable, ClassVar, Dict, Iterator, List, Optional, Tuple

from typing_extensions import Literal

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


@dataclass(repr=False, order=False, frozen=True)
class KinesisFirehoseDataTransformationRecordMetadata:
    """
    Metadata in Firehose Data Transform Record.

    Parameters
    ----------
    partition_keys: Dict[str, str]
        A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}`

    Documentation:
    --------------
    - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
    """

    partition_keys: Dict[str, str] = field(default_factory=lambda: {})

    def asdict(self) -> Dict:
        if self.partition_keys is not None:
            return {"partitionKeys": self.partition_keys}
        return {}


@dataclass(repr=False, order=False)
class KinesisFirehoseDataTransformationRecord:
    """Record in Kinesis Data Firehose response object.

    Parameters
    ----------
    record_id: str
        uniquely identifies this record within the current batch
    result: Literal["Ok", "Dropped", "ProcessingFailed"]
        record data transformation status, whether it succeeded, should be dropped, or failed.
    data: str
        base64-encoded payload, by default empty string.

        Use `data_from_text` or `data_from_json` methods to convert data if needed.

    metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata]
        Metadata associated with this record; can contain partition keys.

        See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
    json_serializer: Callable
        function to serialize `obj` to a JSON formatted `str`, by default json.dumps
    json_deserializer: Callable
        function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
        by default json.loads

    Documentation:
    --------------
    - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
    """

    _valid_result_types: ClassVar[Tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed")

    record_id: str
    result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok"
    data: str = ""
    metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None
    json_serializer: Callable = json.dumps
    json_deserializer: Callable = json.loads
    _json_data: Optional[Any] = None

    def asdict(self) -> Dict:
        if self.result not in self._valid_result_types:
            warnings.warn(
                stacklevel=1,
                message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"',
            )

        record: Dict[str, Any] = {
            "recordId": self.record_id,
            "result": self.result,
            "data": self.data,
        }
        if self.metadata:
            record["metadata"] = self.metadata.asdict()
        return record

    @property
    def data_as_bytes(self) -> bytes:
        """Decoded base64-encoded data as bytes"""
        if not self.data:
            return b""
        return base64.b64decode(self.data)

    @property
    def data_as_text(self) -> str:
        """Decoded base64-encoded data as text"""
        if not self.data:
            return ""
        return self.data_as_bytes.decode("utf-8")

    @property
    def data_as_json(self) -> Dict:
        """Decoded base64-encoded data loaded to json"""
        if not self.data:
            return {}
        if self._json_data is None:
            self._json_data = self.json_deserializer(self.data_as_text)
        return self._json_data


@dataclass(repr=False, order=False)
class KinesisFirehoseDataTransformationResponse:
    """Kinesis Data Firehose response object

    Documentation:
    --------------
    - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

    Parameters
    ----------
    records : List[KinesisFirehoseResponseRecord]
        records of Kinesis Data Firehose response object,
        optional parameter at start. can be added later using `add_record` function.

    Examples
    --------

    **Transforming data records**

    ```python
    from aws_lambda_powertools.utilities.data_classes import (
        KinesisFirehoseDataTransformationRecord,
        KinesisFirehoseDataTransformationResponse,
        KinesisFirehoseEvent,
    )
    from aws_lambda_powertools.utilities.serialization import base64_from_json
    from aws_lambda_powertools.utilities.typing import LambdaContext


    def lambda_handler(event: dict, context: LambdaContext):
        firehose_event = KinesisFirehoseEvent(event)
        result = KinesisFirehoseDataTransformationResponse()

        for record in firehose_event.records:
            payload = record.data_as_text  # base64 decoded data as str

            ## generate data to return
            transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=base64_from_json(transformed_data),
            )

            result.add_record(processed_record)

        # return transformed records
        return result.asdict()
    ```
    """

    records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list)

    def add_record(self, record: KinesisFirehoseDataTransformationRecord):
        self.records.append(record)

    def asdict(self) -> Dict:
        if not self.records:
            raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response")

        return {"records": [record.asdict() for record in self.records]}


class KinesisFirehoseRecordMetadata(DictWrapper):
    @property
    def _metadata(self) -> dict:
        """Optional: metadata associated with this record; present only when Kinesis Stream is source"""
        return self["kinesisRecordMetadata"]  # could raise KeyError

    @property
    def shard_id(self) -> str:
        """Kinesis stream shard ID; present only when Kinesis Stream is source"""
        return self._metadata["shardId"]

    @property
    def partition_key(self) -> str:
        """Kinesis stream partition key; present only when Kinesis Stream is source"""
        return self._metadata["partitionKey"]

    @property
    def approximate_arrival_timestamp(self) -> int:
        """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
        return self._metadata["approximateArrivalTimestamp"]

    @property
    def sequence_number(self) -> str:
        """Kinesis stream sequence number; present only when Kinesis Stream is source"""
        return self._metadata["sequenceNumber"]

    @property
    def subsequence_number(self) -> int:
        """Kinesis stream sub-sequence number; present only when Kinesis Stream is source

        Note: this will only be present for Kinesis streams using record aggregation
        """
        return self._metadata["subsequenceNumber"]


class KinesisFirehoseRecord(DictWrapper):
    @property
    def approximate_arrival_timestamp(self) -> int:
        """The approximate time that the record was inserted into the delivery stream"""
        return self["approximateArrivalTimestamp"]

    @property
    def record_id(self) -> str:
        """Record ID; uniquely identifies this record within the current batch"""
        return self["recordId"]

    @property
    def data(self) -> str:
        """The data blob, base64-encoded"""
        return self["data"]

    @property
    def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]:
        """Optional: metadata associated with this record; present only when Kinesis Stream is source"""
        return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None

    @property
    def data_as_bytes(self) -> bytes:
        """Decoded base64-encoded data as bytes"""
        return base64.b64decode(self.data)

    @property
    def data_as_text(self) -> str:
        """Decoded base64-encoded data as text"""
        return self.data_as_bytes.decode("utf-8")

    @property
    def data_as_json(self) -> dict:
        """Decoded base64-encoded data loaded to json"""
        if self._json_data is None:
            self._json_data = self._json_deserializer(self.data_as_text)
        return self._json_data

    def build_data_transformation_response(
        self,
        result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok",
        data: str = "",
        metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None,
    ) -> KinesisFirehoseDataTransformationRecord:
        """Create a KinesisFirehoseResponseRecord directly using the record_id and given values

        Parameters
        ----------
        result : Literal["Ok", "Dropped", "ProcessingFailed"]
            processing result, supported value: Ok, Dropped, ProcessingFailed
        data : str, optional
            data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
            use either function like `data_from_text`, `data_from_json` to populate data
        metadata: KinesisFirehoseResponseRecordMetadata, optional
            Metadata associated with this record; can contain partition keys
            - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
        """
        return KinesisFirehoseDataTransformationRecord(
            record_id=self.record_id,
            result=result,
            data=data,
            metadata=metadata,
        )


class KinesisFirehoseEvent(DictWrapper):
    """Kinesis Data Firehose event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
    """

    @property
    def invocation_id(self) -> str:
        """Unique ID for for Lambda invocation"""
        return self["invocationId"]

    @property
    def delivery_stream_arn(self) -> str:
        """ARN of the Firehose Data Firehose Delivery Stream"""
        return self["deliveryStreamArn"]

    @property
    def source_kinesis_stream_arn(self) -> Optional[str]:
        """ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
        return self.get("sourceKinesisStreamArn")

    @property
    def region(self) -> str:
        """AWS region where the event originated eg: us-east-1"""
        return self["region"]

    @property
    def records(self) -> Iterator[KinesisFirehoseRecord]:
        for record in self["records"]:
            yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)

Classes

class KinesisFirehoseDataTransformationRecord (record_id: str, result: Literal['Ok', 'Dropped', 'ProcessingFailed'] = 'Ok', data: str = '', metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None, json_serializer: Callable = <function dumps>, json_deserializer: Callable = <function loads>)

Record in Kinesis Data Firehose response object.

Parameters

record_id : str
uniquely identifies this record within the current batch
result : Literal["Ok", "Dropped", "ProcessingFailed"]
record data transformation status, whether it succeeded, should be dropped, or failed.
data : str

base64-encoded payload, by default empty string.

Use data_from_text or data_from_json methods to convert data if needed.

metadata : Optional[KinesisFirehoseDataTransformationRecordMetadata]

Metadata associated with this record; can contain partition keys.

See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html

json_serializer : Callable
function to serialize obj to a JSON formatted str, by default json.dumps
json_deserializer : Callable
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj`, by default json.loads

Documentation:

Expand source code
@dataclass(repr=False, order=False)
class KinesisFirehoseDataTransformationRecord:
    """Record in Kinesis Data Firehose response object.

    Parameters
    ----------
    record_id: str
        uniquely identifies this record within the current batch
    result: Literal["Ok", "Dropped", "ProcessingFailed"]
        record data transformation status, whether it succeeded, should be dropped, or failed.
    data: str
        base64-encoded payload, by default empty string.

        Use `data_from_text` or `data_from_json` methods to convert data if needed.

    metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata]
        Metadata associated with this record; can contain partition keys.

        See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
    json_serializer: Callable
        function to serialize `obj` to a JSON formatted `str`, by default json.dumps
    json_deserializer: Callable
        function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
        by default json.loads

    Documentation:
    --------------
    - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
    """

    _valid_result_types: ClassVar[Tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed")

    record_id: str
    result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok"
    data: str = ""
    metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None
    json_serializer: Callable = json.dumps
    json_deserializer: Callable = json.loads
    _json_data: Optional[Any] = None

    def asdict(self) -> Dict:
        if self.result not in self._valid_result_types:
            warnings.warn(
                stacklevel=1,
                message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"',
            )

        record: Dict[str, Any] = {
            "recordId": self.record_id,
            "result": self.result,
            "data": self.data,
        }
        if self.metadata:
            record["metadata"] = self.metadata.asdict()
        return record

    @property
    def data_as_bytes(self) -> bytes:
        """Decoded base64-encoded data as bytes"""
        if not self.data:
            return b""
        return base64.b64decode(self.data)

    @property
    def data_as_text(self) -> str:
        """Decoded base64-encoded data as text"""
        if not self.data:
            return ""
        return self.data_as_bytes.decode("utf-8")

    @property
    def data_as_json(self) -> Dict:
        """Decoded base64-encoded data loaded to json"""
        if not self.data:
            return {}
        if self._json_data is None:
            self._json_data = self.json_deserializer(self.data_as_text)
        return self._json_data

Class variables

var data : str
var metadata : Optional[KinesisFirehoseDataTransformationRecordMetadata]
var record_id : str
var result : Literal['Ok', 'Dropped', 'ProcessingFailed']

Instance variables

var data_as_bytes : bytes

Decoded base64-encoded data as bytes

Expand source code
@property
def data_as_bytes(self) -> bytes:
    """Decoded base64-encoded data as bytes"""
    if not self.data:
        return b""
    return base64.b64decode(self.data)
var data_as_json : Dict

Decoded base64-encoded data loaded to json

Expand source code
@property
def data_as_json(self) -> Dict:
    """Decoded base64-encoded data loaded to json"""
    if not self.data:
        return {}
    if self._json_data is None:
        self._json_data = self.json_deserializer(self.data_as_text)
    return self._json_data
var data_as_text : str

Decoded base64-encoded data as text

Expand source code
@property
def data_as_text(self) -> str:
    """Decoded base64-encoded data as text"""
    if not self.data:
        return ""
    return self.data_as_bytes.decode("utf-8")

Methods

def asdict(self) ‑> Dict
Expand source code
def asdict(self) -> Dict:
    if self.result not in self._valid_result_types:
        warnings.warn(
            stacklevel=1,
            message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"',
        )

    record: Dict[str, Any] = {
        "recordId": self.record_id,
        "result": self.result,
        "data": self.data,
    }
    if self.metadata:
        record["metadata"] = self.metadata.asdict()
    return record
def json_deserializer(s, *, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw) ‑> Callable

Deserialize s (a str, bytes or bytearray instance containing a JSON document) to a Python object.

object_hook is an optional function that will be called with the result of any object literal decode (a dict). The return value of object_hook will be used instead of the dict. This feature can be used to implement custom decoders (e.g. JSON-RPC class hinting).

object_pairs_hook is an optional function that will be called with the result of any object literal decoded with an ordered list of pairs. The return value of object_pairs_hook will be used instead of the dict. This feature can be used to implement custom decoders. If object_hook is also defined, the object_pairs_hook takes priority.

parse_float, if specified, will be called with the string of every JSON float to be decoded. By default this is equivalent to float(num_str). This can be used to use another datatype or parser for JSON floats (e.g. decimal.Decimal).

parse_int, if specified, will be called with the string of every JSON int to be decoded. By default this is equivalent to int(num_str). This can be used to use another datatype or parser for JSON integers (e.g. float).

parse_constant, if specified, will be called with one of the following strings: -Infinity, Infinity, NaN. This can be used to raise an exception if invalid JSON numbers are encountered.

To use a custom JSONDecoder subclass, specify it with the cls kwarg; otherwise JSONDecoder is used.

Expand source code
def loads(s, *, cls=None, object_hook=None, parse_float=None,
        parse_int=None, parse_constant=None, object_pairs_hook=None, **kw):
    """Deserialize ``s`` (a ``str``, ``bytes`` or ``bytearray`` instance
    containing a JSON document) to a Python object.

    ``object_hook`` is an optional function that will be called with the
    result of any object literal decode (a ``dict``). The return value of
    ``object_hook`` will be used instead of the ``dict``. This feature
    can be used to implement custom decoders (e.g. JSON-RPC class hinting).

    ``object_pairs_hook`` is an optional function that will be called with the
    result of any object literal decoded with an ordered list of pairs.  The
    return value of ``object_pairs_hook`` will be used instead of the ``dict``.
    This feature can be used to implement custom decoders.  If ``object_hook``
    is also defined, the ``object_pairs_hook`` takes priority.

    ``parse_float``, if specified, will be called with the string
    of every JSON float to be decoded. By default this is equivalent to
    float(num_str). This can be used to use another datatype or parser
    for JSON floats (e.g. decimal.Decimal).

    ``parse_int``, if specified, will be called with the string
    of every JSON int to be decoded. By default this is equivalent to
    int(num_str). This can be used to use another datatype or parser
    for JSON integers (e.g. float).

    ``parse_constant``, if specified, will be called with one of the
    following strings: -Infinity, Infinity, NaN.
    This can be used to raise an exception if invalid JSON numbers
    are encountered.

    To use a custom ``JSONDecoder`` subclass, specify it with the ``cls``
    kwarg; otherwise ``JSONDecoder`` is used.
    """
    if isinstance(s, str):
        if s.startswith('\ufeff'):
            raise JSONDecodeError("Unexpected UTF-8 BOM (decode using utf-8-sig)",
                                  s, 0)
    else:
        if not isinstance(s, (bytes, bytearray)):
            raise TypeError(f'the JSON object must be str, bytes or bytearray, '
                            f'not {s.__class__.__name__}')
        s = s.decode(detect_encoding(s), 'surrogatepass')

    if (cls is None and object_hook is None and
            parse_int is None and parse_float is None and
            parse_constant is None and object_pairs_hook is None and not kw):
        return _default_decoder.decode(s)
    if cls is None:
        cls = JSONDecoder
    if object_hook is not None:
        kw['object_hook'] = object_hook
    if object_pairs_hook is not None:
        kw['object_pairs_hook'] = object_pairs_hook
    if parse_float is not None:
        kw['parse_float'] = parse_float
    if parse_int is not None:
        kw['parse_int'] = parse_int
    if parse_constant is not None:
        kw['parse_constant'] = parse_constant
    return cls(**kw).decode(s)
def json_serializer(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=None, sort_keys=False, **kw) ‑> Callable

Serialize obj to a JSON formatted str.

If skipkeys is true then dict keys that are not basic types (str, int, float, bool, None) will be skipped instead of raising a TypeError.

If ensure_ascii is false, then the return value can contain non-ASCII characters if they appear in strings contained in obj. Otherwise, all such characters are escaped in JSON strings.

If check_circular is false, then the circular reference check for container types will be skipped and a circular reference will result in an RecursionError (or worse).

If allow_nan is false, then it will be a ValueError to serialize out of range float values (nan, inf, -inf) in strict compliance of the JSON specification, instead of using the JavaScript equivalents (NaN, Infinity, -Infinity).

If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.

If specified, separators should be an (item_separator, key_separator) tuple. The default is (', ', ': ') if indent is None and (',', ': ') otherwise. To get the most compact JSON representation, you should specify (',', ':') to eliminate whitespace.

default(obj) is a function that should return a serializable version of obj or raise TypeError. The default simply raises TypeError.

If sort_keys is true (default: False), then the output of dictionaries will be sorted by key.

To use a custom JSONEncoder subclass (e.g. one that overrides the .default() method to serialize additional types), specify it with the cls kwarg; otherwise JSONEncoder is used.

Expand source code
def dumps(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True,
        allow_nan=True, cls=None, indent=None, separators=None,
        default=None, sort_keys=False, **kw):
    """Serialize ``obj`` to a JSON formatted ``str``.

    If ``skipkeys`` is true then ``dict`` keys that are not basic types
    (``str``, ``int``, ``float``, ``bool``, ``None``) will be skipped
    instead of raising a ``TypeError``.

    If ``ensure_ascii`` is false, then the return value can contain non-ASCII
    characters if they appear in strings contained in ``obj``. Otherwise, all
    such characters are escaped in JSON strings.

    If ``check_circular`` is false, then the circular reference check
    for container types will be skipped and a circular reference will
    result in an ``RecursionError`` (or worse).

    If ``allow_nan`` is false, then it will be a ``ValueError`` to
    serialize out of range ``float`` values (``nan``, ``inf``, ``-inf``) in
    strict compliance of the JSON specification, instead of using the
    JavaScript equivalents (``NaN``, ``Infinity``, ``-Infinity``).

    If ``indent`` is a non-negative integer, then JSON array elements and
    object members will be pretty-printed with that indent level. An indent
    level of 0 will only insert newlines. ``None`` is the most compact
    representation.

    If specified, ``separators`` should be an ``(item_separator, key_separator)``
    tuple.  The default is ``(', ', ': ')`` if *indent* is ``None`` and
    ``(',', ': ')`` otherwise.  To get the most compact JSON representation,
    you should specify ``(',', ':')`` to eliminate whitespace.

    ``default(obj)`` is a function that should return a serializable version
    of obj or raise TypeError. The default simply raises TypeError.

    If *sort_keys* is true (default: ``False``), then the output of
    dictionaries will be sorted by key.

    To use a custom ``JSONEncoder`` subclass (e.g. one that overrides the
    ``.default()`` method to serialize additional types), specify it with
    the ``cls`` kwarg; otherwise ``JSONEncoder`` is used.

    """
    # cached encoder
    if (not skipkeys and ensure_ascii and
        check_circular and allow_nan and
        cls is None and indent is None and separators is None and
        default is None and not sort_keys and not kw):
        return _default_encoder.encode(obj)
    if cls is None:
        cls = JSONEncoder
    return cls(
        skipkeys=skipkeys, ensure_ascii=ensure_ascii,
        check_circular=check_circular, allow_nan=allow_nan, indent=indent,
        separators=separators, default=default, sort_keys=sort_keys,
        **kw).encode(obj)
class KinesisFirehoseDataTransformationRecordMetadata (partition_keys: Dict[str, str] = <factory>)

Metadata in Firehose Data Transform Record.

Parameters

partition_keys : Dict[str, str]
A dict of partition keys/value in string format, e.g. {"year":"2023","month":"09"}

Documentation:

Expand source code
@dataclass(repr=False, order=False, frozen=True)
class KinesisFirehoseDataTransformationRecordMetadata:
    """
    Metadata in Firehose Data Transform Record.

    Parameters
    ----------
    partition_keys: Dict[str, str]
        A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}`

    Documentation:
    --------------
    - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
    """

    partition_keys: Dict[str, str] = field(default_factory=lambda: {})

    def asdict(self) -> Dict:
        if self.partition_keys is not None:
            return {"partitionKeys": self.partition_keys}
        return {}

Class variables

var partition_keys : Dict[str, str]

Methods

def asdict(self) ‑> Dict
Expand source code
def asdict(self) -> Dict:
    if self.partition_keys is not None:
        return {"partitionKeys": self.partition_keys}
    return {}
class KinesisFirehoseDataTransformationResponse (records: List[KinesisFirehoseDataTransformationRecord] = <factory>)

Kinesis Data Firehose response object

Documentation:

Parameters

records : List[KinesisFirehoseResponseRecord]
records of Kinesis Data Firehose response object, optional parameter at start. can be added later using add_record function.

Examples

Transforming data records

from aws_lambda_powertools.utilities.data_classes import (
    KinesisFirehoseDataTransformationRecord,
    KinesisFirehoseDataTransformationResponse,
    KinesisFirehoseEvent,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext


def lambda_handler(event: dict, context: LambdaContext):
    firehose_event = KinesisFirehoseEvent(event)
    result = KinesisFirehoseDataTransformationResponse()

    for record in firehose_event.records:
        payload = record.data_as_text  # base64 decoded data as str

        ## generate data to return
        transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
        processed_record = KinesisFirehoseDataTransformationRecord(
            record_id=record.record_id,
            data=base64_from_json(transformed_data),
        )

        result.add_record(processed_record)

    # return transformed records
    return result.asdict()
Expand source code
@dataclass(repr=False, order=False)
class KinesisFirehoseDataTransformationResponse:
    """Kinesis Data Firehose response object

    Documentation:
    --------------
    - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

    Parameters
    ----------
    records : List[KinesisFirehoseResponseRecord]
        records of Kinesis Data Firehose response object,
        optional parameter at start. can be added later using `add_record` function.

    Examples
    --------

    **Transforming data records**

    ```python
    from aws_lambda_powertools.utilities.data_classes import (
        KinesisFirehoseDataTransformationRecord,
        KinesisFirehoseDataTransformationResponse,
        KinesisFirehoseEvent,
    )
    from aws_lambda_powertools.utilities.serialization import base64_from_json
    from aws_lambda_powertools.utilities.typing import LambdaContext


    def lambda_handler(event: dict, context: LambdaContext):
        firehose_event = KinesisFirehoseEvent(event)
        result = KinesisFirehoseDataTransformationResponse()

        for record in firehose_event.records:
            payload = record.data_as_text  # base64 decoded data as str

            ## generate data to return
            transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=base64_from_json(transformed_data),
            )

            result.add_record(processed_record)

        # return transformed records
        return result.asdict()
    ```
    """

    records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list)

    def add_record(self, record: KinesisFirehoseDataTransformationRecord):
        self.records.append(record)

    def asdict(self) -> Dict:
        if not self.records:
            raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response")

        return {"records": [record.asdict() for record in self.records]}

Class variables

var records : List[KinesisFirehoseDataTransformationRecord]

Methods

def add_record(self, record: KinesisFirehoseDataTransformationRecord)
Expand source code
def add_record(self, record: KinesisFirehoseDataTransformationRecord):
    self.records.append(record)
def asdict(self) ‑> Dict
Expand source code
def asdict(self) -> Dict:
    if not self.records:
        raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response")

    return {"records": [record.asdict() for record in self.records]}
class KinesisFirehoseEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Kinesis Data Firehose event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisFirehoseEvent(DictWrapper):
    """Kinesis Data Firehose event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
    """

    @property
    def invocation_id(self) -> str:
        """Unique ID for for Lambda invocation"""
        return self["invocationId"]

    @property
    def delivery_stream_arn(self) -> str:
        """ARN of the Firehose Data Firehose Delivery Stream"""
        return self["deliveryStreamArn"]

    @property
    def source_kinesis_stream_arn(self) -> Optional[str]:
        """ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
        return self.get("sourceKinesisStreamArn")

    @property
    def region(self) -> str:
        """AWS region where the event originated eg: us-east-1"""
        return self["region"]

    @property
    def records(self) -> Iterator[KinesisFirehoseRecord]:
        for record in self["records"]:
            yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var delivery_stream_arn : str

ARN of the Firehose Data Firehose Delivery Stream

Expand source code
@property
def delivery_stream_arn(self) -> str:
    """ARN of the Firehose Data Firehose Delivery Stream"""
    return self["deliveryStreamArn"]
var invocation_id : str

Unique ID for for Lambda invocation

Expand source code
@property
def invocation_id(self) -> str:
    """Unique ID for for Lambda invocation"""
    return self["invocationId"]
var records : Iterator[KinesisFirehoseRecord]
Expand source code
@property
def records(self) -> Iterator[KinesisFirehoseRecord]:
    for record in self["records"]:
        yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
var region : str

AWS region where the event originated eg: us-east-1

Expand source code
@property
def region(self) -> str:
    """AWS region where the event originated eg: us-east-1"""
    return self["region"]
var source_kinesis_stream_arn : Optional[str]

ARN of the Kinesis Stream; present only when Kinesis Stream is source

Expand source code
@property
def source_kinesis_stream_arn(self) -> Optional[str]:
    """ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
    return self.get("sourceKinesisStreamArn")

Inherited members

class KinesisFirehoseRecord (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisFirehoseRecord(DictWrapper):
    @property
    def approximate_arrival_timestamp(self) -> int:
        """The approximate time that the record was inserted into the delivery stream"""
        return self["approximateArrivalTimestamp"]

    @property
    def record_id(self) -> str:
        """Record ID; uniquely identifies this record within the current batch"""
        return self["recordId"]

    @property
    def data(self) -> str:
        """The data blob, base64-encoded"""
        return self["data"]

    @property
    def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]:
        """Optional: metadata associated with this record; present only when Kinesis Stream is source"""
        return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None

    @property
    def data_as_bytes(self) -> bytes:
        """Decoded base64-encoded data as bytes"""
        return base64.b64decode(self.data)

    @property
    def data_as_text(self) -> str:
        """Decoded base64-encoded data as text"""
        return self.data_as_bytes.decode("utf-8")

    @property
    def data_as_json(self) -> dict:
        """Decoded base64-encoded data loaded to json"""
        if self._json_data is None:
            self._json_data = self._json_deserializer(self.data_as_text)
        return self._json_data

    def build_data_transformation_response(
        self,
        result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok",
        data: str = "",
        metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None,
    ) -> KinesisFirehoseDataTransformationRecord:
        """Create a KinesisFirehoseResponseRecord directly using the record_id and given values

        Parameters
        ----------
        result : Literal["Ok", "Dropped", "ProcessingFailed"]
            processing result, supported value: Ok, Dropped, ProcessingFailed
        data : str, optional
            data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
            use either function like `data_from_text`, `data_from_json` to populate data
        metadata: KinesisFirehoseResponseRecordMetadata, optional
            Metadata associated with this record; can contain partition keys
            - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
        """
        return KinesisFirehoseDataTransformationRecord(
            record_id=self.record_id,
            result=result,
            data=data,
            metadata=metadata,
        )

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var approximate_arrival_timestamp : int

The approximate time that the record was inserted into the delivery stream

Expand source code
@property
def approximate_arrival_timestamp(self) -> int:
    """The approximate time that the record was inserted into the delivery stream"""
    return self["approximateArrivalTimestamp"]
var data : str

The data blob, base64-encoded

Expand source code
@property
def data(self) -> str:
    """The data blob, base64-encoded"""
    return self["data"]
var data_as_bytes : bytes

Decoded base64-encoded data as bytes

Expand source code
@property
def data_as_bytes(self) -> bytes:
    """Decoded base64-encoded data as bytes"""
    return base64.b64decode(self.data)
var data_as_json : dict

Decoded base64-encoded data loaded to json

Expand source code
@property
def data_as_json(self) -> dict:
    """Decoded base64-encoded data loaded to json"""
    if self._json_data is None:
        self._json_data = self._json_deserializer(self.data_as_text)
    return self._json_data
var data_as_text : str

Decoded base64-encoded data as text

Expand source code
@property
def data_as_text(self) -> str:
    """Decoded base64-encoded data as text"""
    return self.data_as_bytes.decode("utf-8")
var metadata : Optional[KinesisFirehoseRecordMetadata]

Optional: metadata associated with this record; present only when Kinesis Stream is source

Expand source code
@property
def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]:
    """Optional: metadata associated with this record; present only when Kinesis Stream is source"""
    return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None
var record_id : str

Record ID; uniquely identifies this record within the current batch

Expand source code
@property
def record_id(self) -> str:
    """Record ID; uniquely identifies this record within the current batch"""
    return self["recordId"]

Methods

def build_data_transformation_response(self, result: Literal['Ok', 'Dropped', 'ProcessingFailed'] = 'Ok', data: str = '', metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None) ‑> KinesisFirehoseDataTransformationRecord

Create a KinesisFirehoseResponseRecord directly using the record_id and given values

Parameters

result : Literal["Ok", "Dropped", "ProcessingFailed"]
processing result, supported value: Ok, Dropped, ProcessingFailed
data : str, optional
data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or use either function like data_from_text, data_from_json to populate data
metadata : KinesisFirehoseResponseRecordMetadata, optional
Metadata associated with this record; can contain partition keys - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
Expand source code
def build_data_transformation_response(
    self,
    result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok",
    data: str = "",
    metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None,
) -> KinesisFirehoseDataTransformationRecord:
    """Create a KinesisFirehoseResponseRecord directly using the record_id and given values

    Parameters
    ----------
    result : Literal["Ok", "Dropped", "ProcessingFailed"]
        processing result, supported value: Ok, Dropped, ProcessingFailed
    data : str, optional
        data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
        use either function like `data_from_text`, `data_from_json` to populate data
    metadata: KinesisFirehoseResponseRecordMetadata, optional
        Metadata associated with this record; can contain partition keys
        - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
    """
    return KinesisFirehoseDataTransformationRecord(
        record_id=self.record_id,
        result=result,
        data=data,
        metadata=metadata,
    )

Inherited members

class KinesisFirehoseRecordMetadata (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisFirehoseRecordMetadata(DictWrapper):
    @property
    def _metadata(self) -> dict:
        """Optional: metadata associated with this record; present only when Kinesis Stream is source"""
        return self["kinesisRecordMetadata"]  # could raise KeyError

    @property
    def shard_id(self) -> str:
        """Kinesis stream shard ID; present only when Kinesis Stream is source"""
        return self._metadata["shardId"]

    @property
    def partition_key(self) -> str:
        """Kinesis stream partition key; present only when Kinesis Stream is source"""
        return self._metadata["partitionKey"]

    @property
    def approximate_arrival_timestamp(self) -> int:
        """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
        return self._metadata["approximateArrivalTimestamp"]

    @property
    def sequence_number(self) -> str:
        """Kinesis stream sequence number; present only when Kinesis Stream is source"""
        return self._metadata["sequenceNumber"]

    @property
    def subsequence_number(self) -> int:
        """Kinesis stream sub-sequence number; present only when Kinesis Stream is source

        Note: this will only be present for Kinesis streams using record aggregation
        """
        return self._metadata["subsequenceNumber"]

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var approximate_arrival_timestamp : int

Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source

Expand source code
@property
def approximate_arrival_timestamp(self) -> int:
    """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
    return self._metadata["approximateArrivalTimestamp"]
var partition_key : str

Kinesis stream partition key; present only when Kinesis Stream is source

Expand source code
@property
def partition_key(self) -> str:
    """Kinesis stream partition key; present only when Kinesis Stream is source"""
    return self._metadata["partitionKey"]
var sequence_number : str

Kinesis stream sequence number; present only when Kinesis Stream is source

Expand source code
@property
def sequence_number(self) -> str:
    """Kinesis stream sequence number; present only when Kinesis Stream is source"""
    return self._metadata["sequenceNumber"]
var shard_id : str

Kinesis stream shard ID; present only when Kinesis Stream is source

Expand source code
@property
def shard_id(self) -> str:
    """Kinesis stream shard ID; present only when Kinesis Stream is source"""
    return self._metadata["shardId"]
var subsequence_number : int

Kinesis stream sub-sequence number; present only when Kinesis Stream is source

Note: this will only be present for Kinesis streams using record aggregation

Expand source code
@property
def subsequence_number(self) -> int:
    """Kinesis stream sub-sequence number; present only when Kinesis Stream is source

    Note: this will only be present for Kinesis streams using record aggregation
    """
    return self._metadata["subsequenceNumber"]

Inherited members