Module aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event
Classes
class DynamoDBRecord (data: dict[str, Any], json_deserializer: Callable | None = None)
-
A description of a unique event within a stream
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class DynamoDBRecord(DictWrapper): """A description of a unique event within a stream""" @property def aws_region(self) -> str | None: """The region in which the GetRecords request was received""" return self.get("awsRegion") @property def dynamodb(self) -> StreamRecord | None: """The main body of the stream record, containing all the DynamoDB-specific dicts.""" stream_record = self.get("dynamodb") return None if stream_record is None else StreamRecord(stream_record) @property def event_id(self) -> str | None: """A globally unique identifier for the event that was recorded in this stream record.""" return self.get("eventID") @property def event_name(self) -> DynamoDBRecordEventName | None: """The type of data modification that was performed on the DynamoDB table""" item = self.get("eventName") return None if item is None else DynamoDBRecordEventName[item] @property def event_source(self) -> str | None: """The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb.""" return self.get("eventSource") @property def event_source_arn(self) -> str | None: """The Amazon Resource Name (ARN) of the event source""" return self.get("eventSourceARN") @property def event_version(self) -> str | None: """The version number of the stream record format.""" return self.get("eventVersion") @property def user_identity(self) -> dict: """Contains details about the type of identity that made the request""" return self.get("userIdentity") or {}
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop aws_region : str | None
-
The region in which the GetRecords request was received
Expand source code
@property def aws_region(self) -> str | None: """The region in which the GetRecords request was received""" return self.get("awsRegion")
prop dynamodb : StreamRecord | None
-
The main body of the stream record, containing all the DynamoDB-specific dicts.
Expand source code
@property def dynamodb(self) -> StreamRecord | None: """The main body of the stream record, containing all the DynamoDB-specific dicts.""" stream_record = self.get("dynamodb") return None if stream_record is None else StreamRecord(stream_record)
prop event_id : str | None
-
A globally unique identifier for the event that was recorded in this stream record.
Expand source code
@property def event_id(self) -> str | None: """A globally unique identifier for the event that was recorded in this stream record.""" return self.get("eventID")
prop event_name : DynamoDBRecordEventName | None
-
The type of data modification that was performed on the DynamoDB table
Expand source code
@property def event_name(self) -> DynamoDBRecordEventName | None: """The type of data modification that was performed on the DynamoDB table""" item = self.get("eventName") return None if item is None else DynamoDBRecordEventName[item]
prop event_source : str | None
-
The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb.
Expand source code
@property def event_source(self) -> str | None: """The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb.""" return self.get("eventSource")
prop event_source_arn : str | None
-
The Amazon Resource Name (ARN) of the event source
Expand source code
@property def event_source_arn(self) -> str | None: """The Amazon Resource Name (ARN) of the event source""" return self.get("eventSourceARN")
prop event_version : str | None
-
The version number of the stream record format.
Expand source code
@property def event_version(self) -> str | None: """The version number of the stream record format.""" return self.get("eventVersion")
prop user_identity : dict
-
Contains details about the type of identity that made the request
Expand source code
@property def user_identity(self) -> dict: """Contains details about the type of identity that made the request""" return self.get("userIdentity") or {}
Inherited members
class DynamoDBRecordEventName (*args, **kwds)
-
Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum): ... RED = 1 ... BLUE = 2 ... GREEN = 3
Access them by:
- attribute access:
Color.RED
- value lookup:
Color(1)
- name lookup:
Color['RED']
Enumerations can be iterated over, and know how many members they have:
>>> len(Color) 3
>>> list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.
Expand source code
class DynamoDBRecordEventName(Enum): INSERT = 0 # a new item was added to the table MODIFY = 1 # one or more of an existing item's attributes were modified REMOVE = 2 # the item was deleted from the table
Ancestors
- enum.Enum
Class variables
var INSERT
var MODIFY
var REMOVE
class DynamoDBStreamEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Dynamo DB Stream Event
Documentation:
Example
Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.
from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): for record in event.records: # {"N": "123.45"} => Decimal("123.45") key: str = record.dynamodb.keys["id"] print(key)
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class DynamoDBStreamEvent(DictWrapper): """Dynamo DB Stream Event Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html Example ------- **Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.** from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): for record in event.records: # {"N": "123.45"} => Decimal("123.45") key: str = record.dynamodb.keys["id"] print(key) """ @property def records(self) -> Iterator[DynamoDBRecord]: for record in self["Records"]: yield DynamoDBRecord(record)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop records : Iterator[DynamoDBRecord]
-
Expand source code
@property def records(self) -> Iterator[DynamoDBRecord]: for record in self["Records"]: yield DynamoDBRecord(record)
Inherited members
class StreamRecord (data: dict[str, Any])
-
Provides a single read only access to a wrapper dict
StreamRecord constructor Parameters
data
:dict[str, Any]
- Represents the dynamodb dict inside DynamoDBStreamEvent's records
Expand source code
class StreamRecord(DictWrapper): _deserializer = TypeDeserializer() def __init__(self, data: dict[str, Any]): """StreamRecord constructor Parameters ---------- data: dict[str, Any] Represents the dynamodb dict inside DynamoDBStreamEvent's records """ super().__init__(data) self._deserializer = TypeDeserializer() def _deserialize_dynamodb_dict(self, key: str) -> dict[str, Any]: """Deserialize DynamoDB records available in `Keys`, `NewImage`, and `OldImage` Parameters ---------- key : str DynamoDB key (e.g., Keys, NewImage, or OldImage) Returns ------- dict[str, Any] Deserialized records in Python native types """ dynamodb_dict = self._data.get(key) or {} return {k: self._deserializer.deserialize(v) for k, v in dynamodb_dict.items()} @property def approximate_creation_date_time(self) -> int | None: """The approximate date and time when the stream record was created, in UNIX epoch time format.""" item = self.get("ApproximateCreationDateTime") return None if item is None else int(item) @cached_property def keys(self) -> dict[str, Any]: # type: ignore[override] """The primary key attribute(s) for the DynamoDB item that was modified.""" return self._deserialize_dynamodb_dict("Keys") @cached_property def new_image(self) -> dict[str, Any]: """The item in the DynamoDB table as it appeared after it was modified.""" return self._deserialize_dynamodb_dict("NewImage") @cached_property def old_image(self) -> dict[str, Any]: """The item in the DynamoDB table as it appeared before it was modified.""" return self._deserialize_dynamodb_dict("OldImage") @property def sequence_number(self) -> str | None: """The sequence number of the stream record.""" return self.get("SequenceNumber") @property def size_bytes(self) -> int | None: """The size of the stream record, in bytes.""" item = self.get("SizeBytes") return None if item is None else int(item) @property def stream_view_type(self) -> StreamViewType | None: """The type of data from the modified DynamoDB item that was captured in this stream record""" item = self.get("StreamViewType") return None if item is None else StreamViewType[str(item)]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop approximate_creation_date_time : int | None
-
The approximate date and time when the stream record was created, in UNIX epoch time format.
Expand source code
@property def approximate_creation_date_time(self) -> int | None: """The approximate date and time when the stream record was created, in UNIX epoch time format.""" item = self.get("ApproximateCreationDateTime") return None if item is None else int(item)
var keys
-
The primary key attribute(s) for the DynamoDB item that was modified.
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var new_image
-
The item in the DynamoDB table as it appeared after it was modified.
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var old_image
-
The item in the DynamoDB table as it appeared before it was modified.
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
prop sequence_number : str | None
-
The sequence number of the stream record.
Expand source code
@property def sequence_number(self) -> str | None: """The sequence number of the stream record.""" return self.get("SequenceNumber")
prop size_bytes : int | None
-
The size of the stream record, in bytes.
Expand source code
@property def size_bytes(self) -> int | None: """The size of the stream record, in bytes.""" item = self.get("SizeBytes") return None if item is None else int(item)
prop stream_view_type : StreamViewType | None
-
The type of data from the modified DynamoDB item that was captured in this stream record
Expand source code
@property def stream_view_type(self) -> StreamViewType | None: """The type of data from the modified DynamoDB item that was captured in this stream record""" item = self.get("StreamViewType") return None if item is None else StreamViewType[str(item)]
Inherited members
class StreamViewType (*args, **kwds)
-
The type of data from the modified DynamoDB item that was captured in this stream record
Expand source code
class StreamViewType(Enum): """The type of data from the modified DynamoDB item that was captured in this stream record""" KEYS_ONLY = 0 # only the key attributes of the modified item NEW_IMAGE = 1 # the entire item, as it appeared after it was modified. OLD_IMAGE = 2 # the entire item, as it appeared before it was modified. NEW_AND_OLD_IMAGES = 3 # both the new and the old item images of the item.
Ancestors
- enum.Enum
Class variables
var KEYS_ONLY
var NEW_AND_OLD_IMAGES
var NEW_IMAGE
var OLD_IMAGE