Module aws_lambda_powertools.utilities.data_classes.kinesis_stream_event
Expand source code
import base64
import json
import zlib
from typing import Iterator, List
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import (
CloudWatchLogsDecodedData,
)
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
class KinesisStreamRecordPayload(DictWrapper):
@property
def approximate_arrival_timestamp(self) -> float:
"""The approximate time that the record was inserted into the stream"""
return float(self["kinesis"]["approximateArrivalTimestamp"])
@property
def data(self) -> str:
"""The data blob"""
return self["kinesis"]["data"]
@property
def kinesis_schema_version(self) -> str:
"""Schema version for the record"""
return self["kinesis"]["kinesisSchemaVersion"]
@property
def partition_key(self) -> str:
"""Identifies which shard in the stream the data record is assigned to"""
return self["kinesis"]["partitionKey"]
@property
def sequence_number(self) -> str:
"""The unique identifier of the record within its shard"""
return self["kinesis"]["sequenceNumber"]
def data_as_bytes(self) -> bytes:
"""Decode binary encoded data as bytes"""
return base64.b64decode(self.data)
def data_as_text(self) -> str:
"""Decode binary encoded data as text"""
return self.data_as_bytes().decode("utf-8")
def data_as_json(self) -> dict:
"""Decode binary encoded data as json"""
return json.loads(self.data_as_text())
def data_zlib_compressed_as_json(self) -> dict:
"""Decode binary encoded data as bytes"""
decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32)
return json.loads(decompressed)
class KinesisStreamRecord(DictWrapper):
@property
def aws_region(self) -> str:
"""AWS region where the event originated eg: us-east-1"""
return self["awsRegion"]
@property
def event_id(self) -> str:
"""A globally unique identifier for the event that was recorded in this stream record."""
return self["eventID"]
@property
def event_name(self) -> str:
"""Event type eg: aws:kinesis:record"""
return self["eventName"]
@property
def event_source(self) -> str:
"""The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis"""
return self["eventSource"]
@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceARN"]
@property
def event_version(self) -> str:
"""The eventVersion key value contains a major and minor version in the form <major>.<minor>."""
return self["eventVersion"]
@property
def invoke_identity_arn(self) -> str:
"""The ARN for the identity used to invoke the Lambda Function"""
return self["invokeIdentityArn"]
@property
def kinesis(self) -> KinesisStreamRecordPayload:
"""Underlying Kinesis record associated with the event"""
return KinesisStreamRecordPayload(self._data)
class KinesisStreamEvent(DictWrapper):
"""Kinesis stream event
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
"""
@property
def records(self) -> Iterator[KinesisStreamRecord]:
for record in self["Records"]:
yield KinesisStreamRecord(record)
def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> List[CloudWatchLogsDecodedData]:
return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records]
def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) -> CloudWatchLogsDecodedData:
return CloudWatchLogsDecodedData(data=record.kinesis.data_zlib_compressed_as_json())
Functions
def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) ‑> List[CloudWatchLogsDecodedData]
-
Expand source code
def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> List[CloudWatchLogsDecodedData]: return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records]
def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) ‑> CloudWatchLogsDecodedData
-
Expand source code
def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) -> CloudWatchLogsDecodedData: return CloudWatchLogsDecodedData(data=record.kinesis.data_zlib_compressed_as_json())
Classes
class KinesisStreamEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Kinesis stream event
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisStreamEvent(DictWrapper): """Kinesis stream event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html """ @property def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var records : Iterator[KinesisStreamRecord]
-
Expand source code
@property def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record)
Inherited members
class KinesisStreamRecord (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisStreamRecord(DictWrapper): @property def aws_region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["awsRegion"] @property def event_id(self) -> str: """A globally unique identifier for the event that was recorded in this stream record.""" return self["eventID"] @property def event_name(self) -> str: """Event type eg: aws:kinesis:record""" return self["eventName"] @property def event_source(self) -> str: """The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis""" return self["eventSource"] @property def event_source_arn(self) -> str: """The Amazon Resource Name (ARN) of the event source""" return self["eventSourceARN"] @property def event_version(self) -> str: """The eventVersion key value contains a major and minor version in the form <major>.<minor>.""" return self["eventVersion"] @property def invoke_identity_arn(self) -> str: """The ARN for the identity used to invoke the Lambda Function""" return self["invokeIdentityArn"] @property def kinesis(self) -> KinesisStreamRecordPayload: """Underlying Kinesis record associated with the event""" return KinesisStreamRecordPayload(self._data)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var aws_region : str
-
AWS region where the event originated eg: us-east-1
Expand source code
@property def aws_region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["awsRegion"]
var event_id : str
-
A globally unique identifier for the event that was recorded in this stream record.
Expand source code
@property def event_id(self) -> str: """A globally unique identifier for the event that was recorded in this stream record.""" return self["eventID"]
var event_name : str
-
Event type eg: aws:kinesis:record
Expand source code
@property def event_name(self) -> str: """Event type eg: aws:kinesis:record""" return self["eventName"]
var event_source : str
-
The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis
Expand source code
@property def event_source(self) -> str: """The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis""" return self["eventSource"]
var event_source_arn : str
-
The Amazon Resource Name (ARN) of the event source
Expand source code
@property def event_source_arn(self) -> str: """The Amazon Resource Name (ARN) of the event source""" return self["eventSourceARN"]
var event_version : str
-
The eventVersion key value contains a major and minor version in the form
. . Expand source code
@property def event_version(self) -> str: """The eventVersion key value contains a major and minor version in the form <major>.<minor>.""" return self["eventVersion"]
var invoke_identity_arn : str
-
The ARN for the identity used to invoke the Lambda Function
Expand source code
@property def invoke_identity_arn(self) -> str: """The ARN for the identity used to invoke the Lambda Function""" return self["invokeIdentityArn"]
var kinesis : KinesisStreamRecordPayload
-
Underlying Kinesis record associated with the event
Expand source code
@property def kinesis(self) -> KinesisStreamRecordPayload: """Underlying Kinesis record associated with the event""" return KinesisStreamRecordPayload(self._data)
Inherited members
class KinesisStreamRecordPayload (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisStreamRecordPayload(DictWrapper): @property def approximate_arrival_timestamp(self) -> float: """The approximate time that the record was inserted into the stream""" return float(self["kinesis"]["approximateArrivalTimestamp"]) @property def data(self) -> str: """The data blob""" return self["kinesis"]["data"] @property def kinesis_schema_version(self) -> str: """Schema version for the record""" return self["kinesis"]["kinesisSchemaVersion"] @property def partition_key(self) -> str: """Identifies which shard in the stream the data record is assigned to""" return self["kinesis"]["partitionKey"] @property def sequence_number(self) -> str: """The unique identifier of the record within its shard""" return self["kinesis"]["sequenceNumber"] def data_as_bytes(self) -> bytes: """Decode binary encoded data as bytes""" return base64.b64decode(self.data) def data_as_text(self) -> str: """Decode binary encoded data as text""" return self.data_as_bytes().decode("utf-8") def data_as_json(self) -> dict: """Decode binary encoded data as json""" return json.loads(self.data_as_text()) def data_zlib_compressed_as_json(self) -> dict: """Decode binary encoded data as bytes""" decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32) return json.loads(decompressed)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var approximate_arrival_timestamp : float
-
The approximate time that the record was inserted into the stream
Expand source code
@property def approximate_arrival_timestamp(self) -> float: """The approximate time that the record was inserted into the stream""" return float(self["kinesis"]["approximateArrivalTimestamp"])
var data : str
-
The data blob
Expand source code
@property def data(self) -> str: """The data blob""" return self["kinesis"]["data"]
var kinesis_schema_version : str
-
Schema version for the record
Expand source code
@property def kinesis_schema_version(self) -> str: """Schema version for the record""" return self["kinesis"]["kinesisSchemaVersion"]
var partition_key : str
-
Identifies which shard in the stream the data record is assigned to
Expand source code
@property def partition_key(self) -> str: """Identifies which shard in the stream the data record is assigned to""" return self["kinesis"]["partitionKey"]
var sequence_number : str
-
The unique identifier of the record within its shard
Expand source code
@property def sequence_number(self) -> str: """The unique identifier of the record within its shard""" return self["kinesis"]["sequenceNumber"]
Methods
def data_as_bytes(self) ‑> bytes
-
Decode binary encoded data as bytes
Expand source code
def data_as_bytes(self) -> bytes: """Decode binary encoded data as bytes""" return base64.b64decode(self.data)
def data_as_json(self) ‑> dict
-
Decode binary encoded data as json
Expand source code
def data_as_json(self) -> dict: """Decode binary encoded data as json""" return json.loads(self.data_as_text())
def data_as_text(self) ‑> str
-
Decode binary encoded data as text
Expand source code
def data_as_text(self) -> str: """Decode binary encoded data as text""" return self.data_as_bytes().decode("utf-8")
def data_zlib_compressed_as_json(self) ‑> dict
-
Decode binary encoded data as bytes
Expand source code
def data_zlib_compressed_as_json(self) -> dict: """Decode binary encoded data as bytes""" decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32) return json.loads(decompressed)
Inherited members