Module aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event

Classes

class DynamoDBRecord (data: dict[str, Any], json_deserializer: Callable | None = None)

A description of a unique event within a stream

Parameters

data : dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class DynamoDBRecord(DictWrapper):
    """A description of a unique event within a stream"""

    @property
    def aws_region(self) -> str | None:
        """The region in which the GetRecords request was received"""
        return self.get("awsRegion")

    @property
    def dynamodb(self) -> StreamRecord | None:
        """The main body of the stream record, containing all the DynamoDB-specific dicts."""
        stream_record = self.get("dynamodb")
        return None if stream_record is None else StreamRecord(stream_record)

    @property
    def event_id(self) -> str | None:
        """A globally unique identifier for the event that was recorded in this stream record."""
        return self.get("eventID")

    @property
    def event_name(self) -> DynamoDBRecordEventName | None:
        """The type of data modification that was performed on the DynamoDB table"""
        item = self.get("eventName")
        return None if item is None else DynamoDBRecordEventName[item]

    @property
    def event_source(self) -> str | None:
        """The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb."""
        return self.get("eventSource")

    @property
    def event_source_arn(self) -> str | None:
        """The Amazon Resource Name (ARN) of the event source"""
        return self.get("eventSourceARN")

    @property
    def event_version(self) -> str | None:
        """The version number of the stream record format."""
        return self.get("eventVersion")

    @property
    def user_identity(self) -> dict:
        """Contains details about the type of identity that made the request"""
        return self.get("userIdentity") or {}

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container
  • typing.Generic

Instance variables

prop aws_region : str | None

The region in which the GetRecords request was received

Expand source code
@property
def aws_region(self) -> str | None:
    """The region in which the GetRecords request was received"""
    return self.get("awsRegion")
prop dynamodbStreamRecord | None

The main body of the stream record, containing all the DynamoDB-specific dicts.

Expand source code
@property
def dynamodb(self) -> StreamRecord | None:
    """The main body of the stream record, containing all the DynamoDB-specific dicts."""
    stream_record = self.get("dynamodb")
    return None if stream_record is None else StreamRecord(stream_record)
prop event_id : str | None

A globally unique identifier for the event that was recorded in this stream record.

Expand source code
@property
def event_id(self) -> str | None:
    """A globally unique identifier for the event that was recorded in this stream record."""
    return self.get("eventID")
prop event_nameDynamoDBRecordEventName | None

The type of data modification that was performed on the DynamoDB table

Expand source code
@property
def event_name(self) -> DynamoDBRecordEventName | None:
    """The type of data modification that was performed on the DynamoDB table"""
    item = self.get("eventName")
    return None if item is None else DynamoDBRecordEventName[item]
prop event_source : str | None

The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb.

Expand source code
@property
def event_source(self) -> str | None:
    """The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb."""
    return self.get("eventSource")
prop event_source_arn : str | None

The Amazon Resource Name (ARN) of the event source

Expand source code
@property
def event_source_arn(self) -> str | None:
    """The Amazon Resource Name (ARN) of the event source"""
    return self.get("eventSourceARN")
prop event_version : str | None

The version number of the stream record format.

Expand source code
@property
def event_version(self) -> str | None:
    """The version number of the stream record format."""
    return self.get("eventVersion")
prop user_identity : dict

Contains details about the type of identity that made the request

Expand source code
@property
def user_identity(self) -> dict:
    """Contains details about the type of identity that made the request"""
    return self.get("userIdentity") or {}

Inherited members

class DynamoDBRecordEventName (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code
class DynamoDBRecordEventName(Enum):
    INSERT = 0  # a new item was added to the table
    MODIFY = 1  # one or more of an existing item's attributes were modified
    REMOVE = 2  # the item was deleted from the table

Ancestors

  • enum.Enum

Class variables

var INSERT
var MODIFY
var REMOVE
class DynamoDBStreamEvent (data: dict[str, Any], json_deserializer: Callable | None = None)

Dynamo DB Stream Event

Documentation:

Example

Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.

from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=DynamoDBStreamEvent)
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
    for record in event.records:
        # {"N": "123.45"} => Decimal("123.45")
        key: str = record.dynamodb.keys["id"]
        print(key)

Parameters

data : dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class DynamoDBStreamEvent(DictWrapper):
    """Dynamo DB Stream Event

    Documentation:
    -------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html

    Example
    -------
    **Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.**

        from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
        from aws_lambda_powertools.utilities.typing import LambdaContext


        @event_source(data_class=DynamoDBStreamEvent)
        def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
            for record in event.records:
                # {"N": "123.45"} => Decimal("123.45")
                key: str = record.dynamodb.keys["id"]
                print(key)
    """

    @property
    def records(self) -> Iterator[DynamoDBRecord]:
        for record in self["Records"]:
            yield DynamoDBRecord(record)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container
  • typing.Generic

Instance variables

prop records : Iterator[DynamoDBRecord]
Expand source code
@property
def records(self) -> Iterator[DynamoDBRecord]:
    for record in self["Records"]:
        yield DynamoDBRecord(record)

Inherited members

class StreamRecord (data: dict[str, Any])

Provides a single read only access to a wrapper dict

StreamRecord constructor Parameters


data : dict[str, Any]
Represents the dynamodb dict inside DynamoDBStreamEvent's records
Expand source code
class StreamRecord(DictWrapper):
    _deserializer = TypeDeserializer()

    def __init__(self, data: dict[str, Any]):
        """StreamRecord constructor
        Parameters
        ----------
        data: dict[str, Any]
            Represents the dynamodb dict inside DynamoDBStreamEvent's records
        """
        super().__init__(data)
        self._deserializer = TypeDeserializer()

    def _deserialize_dynamodb_dict(self, key: str) -> dict[str, Any]:
        """Deserialize DynamoDB records available in `Keys`, `NewImage`, and `OldImage`

        Parameters
        ----------
        key : str
            DynamoDB key (e.g., Keys, NewImage, or OldImage)

        Returns
        -------
        dict[str, Any]
            Deserialized records in Python native types
        """
        dynamodb_dict = self._data.get(key) or {}
        return {k: self._deserializer.deserialize(v) for k, v in dynamodb_dict.items()}

    @property
    def approximate_creation_date_time(self) -> int | None:
        """The approximate date and time when the stream record was created, in UNIX epoch time format."""
        item = self.get("ApproximateCreationDateTime")
        return None if item is None else int(item)

    @cached_property
    def keys(self) -> dict[str, Any]:  # type: ignore[override]
        """The primary key attribute(s) for the DynamoDB item that was modified."""
        return self._deserialize_dynamodb_dict("Keys")

    @cached_property
    def new_image(self) -> dict[str, Any]:
        """The item in the DynamoDB table as it appeared after it was modified."""
        return self._deserialize_dynamodb_dict("NewImage")

    @cached_property
    def old_image(self) -> dict[str, Any]:
        """The item in the DynamoDB table as it appeared before it was modified."""
        return self._deserialize_dynamodb_dict("OldImage")

    @property
    def sequence_number(self) -> str | None:
        """The sequence number of the stream record."""
        return self.get("SequenceNumber")

    @property
    def size_bytes(self) -> int | None:
        """The size of the stream record, in bytes."""
        item = self.get("SizeBytes")
        return None if item is None else int(item)

    @property
    def stream_view_type(self) -> StreamViewType | None:
        """The type of data from the modified DynamoDB item that was captured in this stream record"""
        item = self.get("StreamViewType")
        return None if item is None else StreamViewType[str(item)]

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container
  • typing.Generic

Instance variables

prop approximate_creation_date_time : int | None

The approximate date and time when the stream record was created, in UNIX epoch time format.

Expand source code
@property
def approximate_creation_date_time(self) -> int | None:
    """The approximate date and time when the stream record was created, in UNIX epoch time format."""
    item = self.get("ApproximateCreationDateTime")
    return None if item is None else int(item)
var keys

The primary key attribute(s) for the DynamoDB item that was modified.

Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        val = self.func(instance)
        try:
            cache[self.attrname] = val
        except TypeError:
            msg = (
                f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                f"does not support item assignment for caching {self.attrname!r} property."
            )
            raise TypeError(msg) from None
    return val
var new_image

The item in the DynamoDB table as it appeared after it was modified.

Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        val = self.func(instance)
        try:
            cache[self.attrname] = val
        except TypeError:
            msg = (
                f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                f"does not support item assignment for caching {self.attrname!r} property."
            )
            raise TypeError(msg) from None
    return val
var old_image

The item in the DynamoDB table as it appeared before it was modified.

Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        val = self.func(instance)
        try:
            cache[self.attrname] = val
        except TypeError:
            msg = (
                f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                f"does not support item assignment for caching {self.attrname!r} property."
            )
            raise TypeError(msg) from None
    return val
prop sequence_number : str | None

The sequence number of the stream record.

Expand source code
@property
def sequence_number(self) -> str | None:
    """The sequence number of the stream record."""
    return self.get("SequenceNumber")
prop size_bytes : int | None

The size of the stream record, in bytes.

Expand source code
@property
def size_bytes(self) -> int | None:
    """The size of the stream record, in bytes."""
    item = self.get("SizeBytes")
    return None if item is None else int(item)
prop stream_view_typeStreamViewType | None

The type of data from the modified DynamoDB item that was captured in this stream record

Expand source code
@property
def stream_view_type(self) -> StreamViewType | None:
    """The type of data from the modified DynamoDB item that was captured in this stream record"""
    item = self.get("StreamViewType")
    return None if item is None else StreamViewType[str(item)]

Inherited members

class StreamViewType (*args, **kwds)

The type of data from the modified DynamoDB item that was captured in this stream record

Expand source code
class StreamViewType(Enum):
    """The type of data from the modified DynamoDB item that was captured in this stream record"""

    KEYS_ONLY = 0  # only the key attributes of the modified item
    NEW_IMAGE = 1  # the entire item, as it appeared after it was modified.
    OLD_IMAGE = 2  # the entire item, as it appeared before it was modified.
    NEW_AND_OLD_IMAGES = 3  # both the new and the old item images of the item.

Ancestors

  • enum.Enum

Class variables

var KEYS_ONLY
var NEW_AND_OLD_IMAGES
var NEW_IMAGE
var OLD_IMAGE