Module aws_lambda_powertools.utilities.data_classes.kinesis_stream_event

Expand source code
import base64
import json
import zlib
from typing import Iterator, List

from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import (
    CloudWatchLogsDecodedData,
)
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class KinesisStreamRecordPayload(DictWrapper):
    @property
    def approximate_arrival_timestamp(self) -> float:
        """The approximate time that the record was inserted into the stream"""
        return float(self["kinesis"]["approximateArrivalTimestamp"])

    @property
    def data(self) -> str:
        """The data blob"""
        return self["kinesis"]["data"]

    @property
    def kinesis_schema_version(self) -> str:
        """Schema version for the record"""
        return self["kinesis"]["kinesisSchemaVersion"]

    @property
    def partition_key(self) -> str:
        """Identifies which shard in the stream the data record is assigned to"""
        return self["kinesis"]["partitionKey"]

    @property
    def sequence_number(self) -> str:
        """The unique identifier of the record within its shard"""
        return self["kinesis"]["sequenceNumber"]

    def data_as_bytes(self) -> bytes:
        """Decode binary encoded data as bytes"""
        return base64.b64decode(self.data)

    def data_as_text(self) -> str:
        """Decode binary encoded data as text"""
        return self.data_as_bytes().decode("utf-8")

    def data_as_json(self) -> dict:
        """Decode binary encoded data as json"""
        return json.loads(self.data_as_text())

    def data_zlib_compressed_as_json(self) -> dict:
        """Decode binary encoded data as bytes"""
        decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32)
        return json.loads(decompressed)


class KinesisStreamRecord(DictWrapper):
    @property
    def aws_region(self) -> str:
        """AWS region where the event originated eg: us-east-1"""
        return self["awsRegion"]

    @property
    def event_id(self) -> str:
        """A globally unique identifier for the event that was recorded in this stream record."""
        return self["eventID"]

    @property
    def event_name(self) -> str:
        """Event type eg: aws:kinesis:record"""
        return self["eventName"]

    @property
    def event_source(self) -> str:
        """The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis"""
        return self["eventSource"]

    @property
    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceARN"]

    @property
    def event_version(self) -> str:
        """The eventVersion key value contains a major and minor version in the form <major>.<minor>."""
        return self["eventVersion"]

    @property
    def invoke_identity_arn(self) -> str:
        """The ARN for the identity used to invoke the Lambda Function"""
        return self["invokeIdentityArn"]

    @property
    def kinesis(self) -> KinesisStreamRecordPayload:
        """Underlying Kinesis record associated with the event"""
        return KinesisStreamRecordPayload(self._data)


class KinesisStreamEvent(DictWrapper):
    """Kinesis stream event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
    """

    @property
    def records(self) -> Iterator[KinesisStreamRecord]:
        for record in self["Records"]:
            yield KinesisStreamRecord(record)


def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> List[CloudWatchLogsDecodedData]:
    return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records]


def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) -> CloudWatchLogsDecodedData:
    return CloudWatchLogsDecodedData(data=record.kinesis.data_zlib_compressed_as_json())

Functions

def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) ‑> List[CloudWatchLogsDecodedData]
Expand source code
def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> List[CloudWatchLogsDecodedData]:
    return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records]
def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) ‑> CloudWatchLogsDecodedData
Expand source code
def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) -> CloudWatchLogsDecodedData:
    return CloudWatchLogsDecodedData(data=record.kinesis.data_zlib_compressed_as_json())

Classes

class KinesisStreamEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Kinesis stream event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisStreamEvent(DictWrapper):
    """Kinesis stream event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
    """

    @property
    def records(self) -> Iterator[KinesisStreamRecord]:
        for record in self["Records"]:
            yield KinesisStreamRecord(record)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var records : Iterator[KinesisStreamRecord]
Expand source code
@property
def records(self) -> Iterator[KinesisStreamRecord]:
    for record in self["Records"]:
        yield KinesisStreamRecord(record)

Inherited members

class KinesisStreamRecord (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisStreamRecord(DictWrapper):
    @property
    def aws_region(self) -> str:
        """AWS region where the event originated eg: us-east-1"""
        return self["awsRegion"]

    @property
    def event_id(self) -> str:
        """A globally unique identifier for the event that was recorded in this stream record."""
        return self["eventID"]

    @property
    def event_name(self) -> str:
        """Event type eg: aws:kinesis:record"""
        return self["eventName"]

    @property
    def event_source(self) -> str:
        """The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis"""
        return self["eventSource"]

    @property
    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceARN"]

    @property
    def event_version(self) -> str:
        """The eventVersion key value contains a major and minor version in the form <major>.<minor>."""
        return self["eventVersion"]

    @property
    def invoke_identity_arn(self) -> str:
        """The ARN for the identity used to invoke the Lambda Function"""
        return self["invokeIdentityArn"]

    @property
    def kinesis(self) -> KinesisStreamRecordPayload:
        """Underlying Kinesis record associated with the event"""
        return KinesisStreamRecordPayload(self._data)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var aws_region : str

AWS region where the event originated eg: us-east-1

Expand source code
@property
def aws_region(self) -> str:
    """AWS region where the event originated eg: us-east-1"""
    return self["awsRegion"]
var event_id : str

A globally unique identifier for the event that was recorded in this stream record.

Expand source code
@property
def event_id(self) -> str:
    """A globally unique identifier for the event that was recorded in this stream record."""
    return self["eventID"]
var event_name : str

Event type eg: aws:kinesis:record

Expand source code
@property
def event_name(self) -> str:
    """Event type eg: aws:kinesis:record"""
    return self["eventName"]
var event_source : str

The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis

Expand source code
@property
def event_source(self) -> str:
    """The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis"""
    return self["eventSource"]
var event_source_arn : str

The Amazon Resource Name (ARN) of the event source

Expand source code
@property
def event_source_arn(self) -> str:
    """The Amazon Resource Name (ARN) of the event source"""
    return self["eventSourceARN"]
var event_version : str

The eventVersion key value contains a major and minor version in the form ..

Expand source code
@property
def event_version(self) -> str:
    """The eventVersion key value contains a major and minor version in the form <major>.<minor>."""
    return self["eventVersion"]
var invoke_identity_arn : str

The ARN for the identity used to invoke the Lambda Function

Expand source code
@property
def invoke_identity_arn(self) -> str:
    """The ARN for the identity used to invoke the Lambda Function"""
    return self["invokeIdentityArn"]
var kinesisKinesisStreamRecordPayload

Underlying Kinesis record associated with the event

Expand source code
@property
def kinesis(self) -> KinesisStreamRecordPayload:
    """Underlying Kinesis record associated with the event"""
    return KinesisStreamRecordPayload(self._data)

Inherited members

class KinesisStreamRecordPayload (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisStreamRecordPayload(DictWrapper):
    @property
    def approximate_arrival_timestamp(self) -> float:
        """The approximate time that the record was inserted into the stream"""
        return float(self["kinesis"]["approximateArrivalTimestamp"])

    @property
    def data(self) -> str:
        """The data blob"""
        return self["kinesis"]["data"]

    @property
    def kinesis_schema_version(self) -> str:
        """Schema version for the record"""
        return self["kinesis"]["kinesisSchemaVersion"]

    @property
    def partition_key(self) -> str:
        """Identifies which shard in the stream the data record is assigned to"""
        return self["kinesis"]["partitionKey"]

    @property
    def sequence_number(self) -> str:
        """The unique identifier of the record within its shard"""
        return self["kinesis"]["sequenceNumber"]

    def data_as_bytes(self) -> bytes:
        """Decode binary encoded data as bytes"""
        return base64.b64decode(self.data)

    def data_as_text(self) -> str:
        """Decode binary encoded data as text"""
        return self.data_as_bytes().decode("utf-8")

    def data_as_json(self) -> dict:
        """Decode binary encoded data as json"""
        return json.loads(self.data_as_text())

    def data_zlib_compressed_as_json(self) -> dict:
        """Decode binary encoded data as bytes"""
        decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32)
        return json.loads(decompressed)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var approximate_arrival_timestamp : float

The approximate time that the record was inserted into the stream

Expand source code
@property
def approximate_arrival_timestamp(self) -> float:
    """The approximate time that the record was inserted into the stream"""
    return float(self["kinesis"]["approximateArrivalTimestamp"])
var data : str

The data blob

Expand source code
@property
def data(self) -> str:
    """The data blob"""
    return self["kinesis"]["data"]
var kinesis_schema_version : str

Schema version for the record

Expand source code
@property
def kinesis_schema_version(self) -> str:
    """Schema version for the record"""
    return self["kinesis"]["kinesisSchemaVersion"]
var partition_key : str

Identifies which shard in the stream the data record is assigned to

Expand source code
@property
def partition_key(self) -> str:
    """Identifies which shard in the stream the data record is assigned to"""
    return self["kinesis"]["partitionKey"]
var sequence_number : str

The unique identifier of the record within its shard

Expand source code
@property
def sequence_number(self) -> str:
    """The unique identifier of the record within its shard"""
    return self["kinesis"]["sequenceNumber"]

Methods

def data_as_bytes(self) ‑> bytes

Decode binary encoded data as bytes

Expand source code
def data_as_bytes(self) -> bytes:
    """Decode binary encoded data as bytes"""
    return base64.b64decode(self.data)
def data_as_json(self) ‑> dict

Decode binary encoded data as json

Expand source code
def data_as_json(self) -> dict:
    """Decode binary encoded data as json"""
    return json.loads(self.data_as_text())
def data_as_text(self) ‑> str

Decode binary encoded data as text

Expand source code
def data_as_text(self) -> str:
    """Decode binary encoded data as text"""
    return self.data_as_bytes().decode("utf-8")
def data_zlib_compressed_as_json(self) ‑> dict

Decode binary encoded data as bytes

Expand source code
def data_zlib_compressed_as_json(self) -> dict:
    """Decode binary encoded data as bytes"""
    decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32)
    return json.loads(decompressed)

Inherited members