Module aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event

Classes

class DynamoDBRecord (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

A description of a unique event within a stream

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class DynamoDBRecord(DictWrapper):
    """A description of a unique event within a stream"""

    @property
    def aws_region(self) -> Optional[str]:
        """The region in which the GetRecords request was received"""
        return self.get("awsRegion")

    @property
    def dynamodb(self) -> Optional[StreamRecord]:
        """The main body of the stream record, containing all the DynamoDB-specific dicts."""
        stream_record = self.get("dynamodb")
        return None if stream_record is None else StreamRecord(stream_record)

    @property
    def event_id(self) -> Optional[str]:
        """A globally unique identifier for the event that was recorded in this stream record."""
        return self.get("eventID")

    @property
    def event_name(self) -> Optional[DynamoDBRecordEventName]:
        """The type of data modification that was performed on the DynamoDB table"""
        item = self.get("eventName")
        return None if item is None else DynamoDBRecordEventName[item]

    @property
    def event_source(self) -> Optional[str]:
        """The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb."""
        return self.get("eventSource")

    @property
    def event_source_arn(self) -> Optional[str]:
        """The Amazon Resource Name (ARN) of the event source"""
        return self.get("eventSourceARN")

    @property
    def event_version(self) -> Optional[str]:
        """The version number of the stream record format."""
        return self.get("eventVersion")

    @property
    def user_identity(self) -> Optional[dict]:
        """Contains details about the type of identity that made the request"""
        return self.get("userIdentity")

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

prop aws_region : Optional[str]

The region in which the GetRecords request was received

Expand source code
@property
def aws_region(self) -> Optional[str]:
    """The region in which the GetRecords request was received"""
    return self.get("awsRegion")
prop dynamodb : Optional[StreamRecord]

The main body of the stream record, containing all the DynamoDB-specific dicts.

Expand source code
@property
def dynamodb(self) -> Optional[StreamRecord]:
    """The main body of the stream record, containing all the DynamoDB-specific dicts."""
    stream_record = self.get("dynamodb")
    return None if stream_record is None else StreamRecord(stream_record)
prop event_id : Optional[str]

A globally unique identifier for the event that was recorded in this stream record.

Expand source code
@property
def event_id(self) -> Optional[str]:
    """A globally unique identifier for the event that was recorded in this stream record."""
    return self.get("eventID")
prop event_name : Optional[DynamoDBRecordEventName]

The type of data modification that was performed on the DynamoDB table

Expand source code
@property
def event_name(self) -> Optional[DynamoDBRecordEventName]:
    """The type of data modification that was performed on the DynamoDB table"""
    item = self.get("eventName")
    return None if item is None else DynamoDBRecordEventName[item]
prop event_source : Optional[str]

The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb.

Expand source code
@property
def event_source(self) -> Optional[str]:
    """The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb."""
    return self.get("eventSource")
prop event_source_arn : Optional[str]

The Amazon Resource Name (ARN) of the event source

Expand source code
@property
def event_source_arn(self) -> Optional[str]:
    """The Amazon Resource Name (ARN) of the event source"""
    return self.get("eventSourceARN")
prop event_version : Optional[str]

The version number of the stream record format.

Expand source code
@property
def event_version(self) -> Optional[str]:
    """The version number of the stream record format."""
    return self.get("eventVersion")
prop user_identity : Optional[dict]

Contains details about the type of identity that made the request

Expand source code
@property
def user_identity(self) -> Optional[dict]:
    """Contains details about the type of identity that made the request"""
    return self.get("userIdentity")

Inherited members

class DynamoDBRecordEventName (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code
class DynamoDBRecordEventName(Enum):
    INSERT = 0  # a new item was added to the table
    MODIFY = 1  # one or more of an existing item's attributes were modified
    REMOVE = 2  # the item was deleted from the table

Ancestors

  • enum.Enum

Class variables

var INSERT
var MODIFY
var REMOVE
class DynamoDBStreamEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Dynamo DB Stream Event

Documentation:

Example

Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.

from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=DynamoDBStreamEvent)
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
    for record in event.records:
        # {"N": "123.45"} => Decimal("123.45")
        key: str = record.dynamodb.keys["id"]
        print(key)

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class DynamoDBStreamEvent(DictWrapper):
    """Dynamo DB Stream Event

    Documentation:
    -------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html

    Example
    -------
    **Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.**

        from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
        from aws_lambda_powertools.utilities.typing import LambdaContext


        @event_source(data_class=DynamoDBStreamEvent)
        def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
            for record in event.records:
                # {"N": "123.45"} => Decimal("123.45")
                key: str = record.dynamodb.keys["id"]
                print(key)
    """

    @property
    def records(self) -> Iterator[DynamoDBRecord]:
        for record in self["Records"]:
            yield DynamoDBRecord(record)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

prop records : Iterator[DynamoDBRecord]
Expand source code
@property
def records(self) -> Iterator[DynamoDBRecord]:
    for record in self["Records"]:
        yield DynamoDBRecord(record)

Inherited members

class StreamRecord (data: Dict[str, Any])

Provides a single read only access to a wrapper dict

StreamRecord constructor Parameters


data : Dict[str, Any]
Represents the dynamodb dict inside DynamoDBStreamEvent's records
Expand source code
class StreamRecord(DictWrapper):
    _deserializer = TypeDeserializer()

    def __init__(self, data: Dict[str, Any]):
        """StreamRecord constructor
        Parameters
        ----------
        data: Dict[str, Any]
            Represents the dynamodb dict inside DynamoDBStreamEvent's records
        """
        super().__init__(data)
        self._deserializer = TypeDeserializer()

    def _deserialize_dynamodb_dict(self, key: str) -> Optional[Dict[str, Any]]:
        """Deserialize DynamoDB records available in `Keys`, `NewImage`, and `OldImage`

        Parameters
        ----------
        key : str
            DynamoDB key (e.g., Keys, NewImage, or OldImage)

        Returns
        -------
        Optional[Dict[str, Any]]
            Deserialized records in Python native types
        """
        dynamodb_dict = self._data.get(key)
        if dynamodb_dict is None:
            return None

        return {k: self._deserializer.deserialize(v) for k, v in dynamodb_dict.items()}

    @property
    def approximate_creation_date_time(self) -> Optional[int]:
        """The approximate date and time when the stream record was created, in UNIX epoch time format."""
        item = self.get("ApproximateCreationDateTime")
        return None if item is None else int(item)

    @property
    def keys(self) -> Optional[Dict[str, Any]]:  # type: ignore[override]
        """The primary key attribute(s) for the DynamoDB item that was modified."""
        return self._deserialize_dynamodb_dict("Keys")

    @property
    def new_image(self) -> Optional[Dict[str, Any]]:
        """The item in the DynamoDB table as it appeared after it was modified."""
        return self._deserialize_dynamodb_dict("NewImage")

    @property
    def old_image(self) -> Optional[Dict[str, Any]]:
        """The item in the DynamoDB table as it appeared before it was modified."""
        return self._deserialize_dynamodb_dict("OldImage")

    @property
    def sequence_number(self) -> Optional[str]:
        """The sequence number of the stream record."""
        return self.get("SequenceNumber")

    @property
    def size_bytes(self) -> Optional[int]:
        """The size of the stream record, in bytes."""
        item = self.get("SizeBytes")
        return None if item is None else int(item)

    @property
    def stream_view_type(self) -> Optional[StreamViewType]:
        """The type of data from the modified DynamoDB item that was captured in this stream record"""
        item = self.get("StreamViewType")
        return None if item is None else StreamViewType[str(item)]

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

prop approximate_creation_date_time : Optional[int]

The approximate date and time when the stream record was created, in UNIX epoch time format.

Expand source code
@property
def approximate_creation_date_time(self) -> Optional[int]:
    """The approximate date and time when the stream record was created, in UNIX epoch time format."""
    item = self.get("ApproximateCreationDateTime")
    return None if item is None else int(item)
prop keys : Optional[Dict[str, Any]]

The primary key attribute(s) for the DynamoDB item that was modified.

Expand source code
@property
def keys(self) -> Optional[Dict[str, Any]]:  # type: ignore[override]
    """The primary key attribute(s) for the DynamoDB item that was modified."""
    return self._deserialize_dynamodb_dict("Keys")
prop new_image : Optional[Dict[str, Any]]

The item in the DynamoDB table as it appeared after it was modified.

Expand source code
@property
def new_image(self) -> Optional[Dict[str, Any]]:
    """The item in the DynamoDB table as it appeared after it was modified."""
    return self._deserialize_dynamodb_dict("NewImage")
prop old_image : Optional[Dict[str, Any]]

The item in the DynamoDB table as it appeared before it was modified.

Expand source code
@property
def old_image(self) -> Optional[Dict[str, Any]]:
    """The item in the DynamoDB table as it appeared before it was modified."""
    return self._deserialize_dynamodb_dict("OldImage")
prop sequence_number : Optional[str]

The sequence number of the stream record.

Expand source code
@property
def sequence_number(self) -> Optional[str]:
    """The sequence number of the stream record."""
    return self.get("SequenceNumber")
prop size_bytes : Optional[int]

The size of the stream record, in bytes.

Expand source code
@property
def size_bytes(self) -> Optional[int]:
    """The size of the stream record, in bytes."""
    item = self.get("SizeBytes")
    return None if item is None else int(item)
prop stream_view_type : Optional[StreamViewType]

The type of data from the modified DynamoDB item that was captured in this stream record

Expand source code
@property
def stream_view_type(self) -> Optional[StreamViewType]:
    """The type of data from the modified DynamoDB item that was captured in this stream record"""
    item = self.get("StreamViewType")
    return None if item is None else StreamViewType[str(item)]

Inherited members

class StreamViewType (*args, **kwds)

The type of data from the modified DynamoDB item that was captured in this stream record

Expand source code
class StreamViewType(Enum):
    """The type of data from the modified DynamoDB item that was captured in this stream record"""

    KEYS_ONLY = 0  # only the key attributes of the modified item
    NEW_IMAGE = 1  # the entire item, as it appeared after it was modified.
    OLD_IMAGE = 2  # the entire item, as it appeared before it was modified.
    NEW_AND_OLD_IMAGES = 3  # both the new and the old item images of the item.

Ancestors

  • enum.Enum

Class variables

var KEYS_ONLY
var NEW_AND_OLD_IMAGES
var NEW_IMAGE
var OLD_IMAGE
class TypeDeserializer

Deserializes DynamoDB types to Python types.

It's based on boto3's DynamoDB TypeDeserializer.

The only notable difference is that for Binary (B, BS) values we return Python Bytes directly, since we don't support Python 2.

Expand source code
class TypeDeserializer:
    """
    Deserializes DynamoDB types to Python types.

    It's based on boto3's [DynamoDB TypeDeserializer](https://boto3.amazonaws.com/v1/documentation/api/latest/_modules/boto3/dynamodb/types.html).

    The only notable difference is that for Binary (`B`, `BS`) values we return Python Bytes directly,
    since we don't support Python 2.
    """

    def deserialize(self, value: Dict) -> Any:
        """Deserialize DynamoDB data types into Python types.

        Parameters
        ----------
        value: Any
            DynamoDB value to be deserialized to a python type


            Here are the various conversions:

            DynamoDB                                Python
            --------                                ------
            {'NULL': True}                          None
            {'BOOL': True/False}                    True/False
            {'N': Decimal(value)}                   Decimal(value)
            {'S': string}                           string
            {'B': bytes}                            bytes
            {'NS': [str(value)]}                    set([str(value)])
            {'SS': [string]}                        set([string])
            {'BS': [bytes]}                         set([bytes])
            {'L': list}                             list
            {'M': dict}                             dict

        Parameters
        ----------
        value: Any
            DynamoDB value to be deserialized to a python type

        Returns
        --------
        any
            Python native type converted from DynamoDB type
        """

        dynamodb_type = list(value.keys())[0]
        deserializer: Optional[Callable] = getattr(self, f"_deserialize_{dynamodb_type}".lower(), None)
        if deserializer is None:
            raise TypeError(f"Dynamodb type {dynamodb_type} is not supported")

        return deserializer(value[dynamodb_type])

    def _deserialize_null(self, value: bool) -> None:
        return None

    def _deserialize_bool(self, value: bool) -> bool:
        return value

    def _deserialize_n(self, value: str) -> Decimal:
        # value is None or "."? It's zero
        # then return early
        value = value.lstrip("0")
        if not value or value == ".":
            return DYNAMODB_CONTEXT.create_decimal(0)

        if len(value) > 38:
            # See: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Number
            # Calculate the number of trailing zeros after the 38th character
            tail = len(value[38:]) - len(value[38:].rstrip("0"))
            # Trim the value: remove trailing zeros if any, or just take the first 38 characters
            value = value[:-tail] if tail > 0 else value[:38]

        return DYNAMODB_CONTEXT.create_decimal(value)

    def _deserialize_s(self, value: str) -> str:
        return value

    def _deserialize_b(self, value: bytes) -> bytes:
        return value

    def _deserialize_ns(self, value: Sequence[str]) -> Set[Decimal]:
        return set(map(self._deserialize_n, value))

    def _deserialize_ss(self, value: Sequence[str]) -> Set[str]:
        return set(map(self._deserialize_s, value))

    def _deserialize_bs(self, value: Sequence[bytes]) -> Set[bytes]:
        return set(map(self._deserialize_b, value))

    def _deserialize_l(self, value: Sequence[Dict]) -> Sequence[Any]:
        return [self.deserialize(v) for v in value]

    def _deserialize_m(self, value: Dict) -> Dict:
        return {k: self.deserialize(v) for k, v in value.items()}

Methods

def deserialize(self, value: Dict) ‑> Any

Deserialize DynamoDB data types into Python types.

Parameters

value : Any

DynamoDB value to be deserialized to a python type

Here are the various conversions:

DynamoDB Python -------- ------ {'NULL': True} None {'BOOL': True/False} True/False {'N': Decimal(value)} Decimal(value) {'S': string} string {'B': bytes} bytes {'NS': [str(value)]} set([str(value)]) {'SS': [string]} set([string]) {'BS': [bytes]} set([bytes]) {'L': list} list {'M': dict} dict

Parameters

value : Any
DynamoDB value to be deserialized to a python type

Returns

any
Python native type converted from DynamoDB type