Module aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event

Expand source code
from decimal import Clamped, Context, Decimal, Inexact, Overflow, Rounded, Underflow
from enum import Enum
from typing import Any, Callable, Dict, Iterator, Optional, Sequence, Set

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper

# NOTE: DynamoDB supports up to 38 digits precision
# Therefore, this ensures our Decimal follows what's stored in the table
    traps=[Clamped, Overflow, Inexact, Rounded, Underflow],

class TypeDeserializer:
    Deserializes DynamoDB types to Python types.

    It's based on boto3's [DynamoDB TypeDeserializer](

    The only notable difference is that for Binary (`B`, `BS`) values we return Python Bytes directly,
    since we don't support Python 2.

    def deserialize(self, value: Dict) -> Any:
        """Deserialize DynamoDB data types into Python types.

        value: Any
            DynamoDB value to be deserialized to a python type

            Here are the various conversions:

            DynamoDB                                Python
            --------                                ------
            {'NULL': True}                          None
            {'BOOL': True/False}                    True/False
            {'N': Decimal(value)}                   Decimal(value)
            {'S': string}                           string
            {'B': bytes}                            bytes
            {'NS': [str(value)]}                    set([str(value)])
            {'SS': [string]}                        set([string])
            {'BS': [bytes]}                         set([bytes])
            {'L': list}                             list
            {'M': dict}                             dict

        value: Any
            DynamoDB value to be deserialized to a python type

            Python native type converted from DynamoDB type

        dynamodb_type = list(value.keys())[0]
        deserializer: Optional[Callable] = getattr(self, f"_deserialize_{dynamodb_type}".lower(), None)
        if deserializer is None:
            raise TypeError(f"Dynamodb type {dynamodb_type} is not supported")

        return deserializer(value[dynamodb_type])

    def _deserialize_null(self, value: bool) -> None:
        return None

    def _deserialize_bool(self, value: bool) -> bool:
        return value

    def _deserialize_n(self, value: str) -> Decimal:
        return DYNAMODB_CONTEXT.create_decimal(value)

    def _deserialize_s(self, value: str) -> str:
        return value

    def _deserialize_b(self, value: bytes) -> bytes:
        return value

    def _deserialize_ns(self, value: Sequence[str]) -> Set[Decimal]:
        return set(map(self._deserialize_n, value))

    def _deserialize_ss(self, value: Sequence[str]) -> Set[str]:
        return set(map(self._deserialize_s, value))

    def _deserialize_bs(self, value: Sequence[bytes]) -> Set[bytes]:
        return set(map(self._deserialize_b, value))

    def _deserialize_l(self, value: Sequence[Dict]) -> Sequence[Any]:
        return [self.deserialize(v) for v in value]

    def _deserialize_m(self, value: Dict) -> Dict:
        return {k: self.deserialize(v) for k, v in value.items()}

class StreamViewType(Enum):
    """The type of data from the modified DynamoDB item that was captured in this stream record"""

    KEYS_ONLY = 0  # only the key attributes of the modified item
    NEW_IMAGE = 1  # the entire item, as it appeared after it was modified.
    OLD_IMAGE = 2  # the entire item, as it appeared before it was modified.
    NEW_AND_OLD_IMAGES = 3  # both the new and the old item images of the item.

class StreamRecord(DictWrapper):
    _deserializer = TypeDeserializer()

    def __init__(self, data: Dict[str, Any]):
        """StreamRecord constructor
        data: Dict[str, Any]
            Represents the dynamodb dict inside DynamoDBStreamEvent's records
        self._deserializer = TypeDeserializer()

    def _deserialize_dynamodb_dict(self, key: str) -> Optional[Dict[str, Any]]:
        """Deserialize DynamoDB records available in `Keys`, `NewImage`, and `OldImage`

        key : str
            DynamoDB key (e.g., Keys, NewImage, or OldImage)

        Optional[Dict[str, Any]]
            Deserialized records in Python native types
        dynamodb_dict = self._data.get(key)
        if dynamodb_dict is None:
            return None

        return {k: self._deserializer.deserialize(v) for k, v in dynamodb_dict.items()}

    def approximate_creation_date_time(self) -> Optional[int]:
        """The approximate date and time when the stream record was created, in UNIX epoch time format."""
        item = self.get("ApproximateCreationDateTime")
        return None if item is None else int(item)

    def keys(self) -> Optional[Dict[str, Any]]:  # type: ignore[override]
        """The primary key attribute(s) for the DynamoDB item that was modified."""
        return self._deserialize_dynamodb_dict("Keys")

    def new_image(self) -> Optional[Dict[str, Any]]:
        """The item in the DynamoDB table as it appeared after it was modified."""
        return self._deserialize_dynamodb_dict("NewImage")

    def old_image(self) -> Optional[Dict[str, Any]]:
        """The item in the DynamoDB table as it appeared before it was modified."""
        return self._deserialize_dynamodb_dict("OldImage")

    def sequence_number(self) -> Optional[str]:
        """The sequence number of the stream record."""
        return self.get("SequenceNumber")

    def size_bytes(self) -> Optional[int]:
        """The size of the stream record, in bytes."""
        item = self.get("SizeBytes")
        return None if item is None else int(item)

    def stream_view_type(self) -> Optional[StreamViewType]:
        """The type of data from the modified DynamoDB item that was captured in this stream record"""
        item = self.get("StreamViewType")
        return None if item is None else StreamViewType[str(item)]

class DynamoDBRecordEventName(Enum):
    INSERT = 0  # a new item was added to the table
    MODIFY = 1  # one or more of an existing item's attributes were modified
    REMOVE = 2  # the item was deleted from the table

class DynamoDBRecord(DictWrapper):
    """A description of a unique event within a stream"""

    def aws_region(self) -> Optional[str]:
        """The region in which the GetRecords request was received"""
        return self.get("awsRegion")

    def dynamodb(self) -> Optional[StreamRecord]:
        """The main body of the stream record, containing all the DynamoDB-specific dicts."""
        stream_record = self.get("dynamodb")
        return None if stream_record is None else StreamRecord(stream_record)

    def event_id(self) -> Optional[str]:
        """A globally unique identifier for the event that was recorded in this stream record."""
        return self.get("eventID")

    def event_name(self) -> Optional[DynamoDBRecordEventName]:
        """The type of data modification that was performed on the DynamoDB table"""
        item = self.get("eventName")
        return None if item is None else DynamoDBRecordEventName[item]

    def event_source(self) -> Optional[str]:
        """The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb."""
        return self.get("eventSource")

    def event_source_arn(self) -> Optional[str]:
        """The Amazon Resource Name (ARN) of the event source"""
        return self.get("eventSourceARN")

    def event_version(self) -> Optional[str]:
        """The version number of the stream record format."""
        return self.get("eventVersion")

    def user_identity(self) -> Optional[dict]:
        """Contains details about the type of identity that made the request"""
        return self.get("userIdentity")

class DynamoDBStreamEvent(DictWrapper):
    """Dynamo DB Stream Event


    **Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.**

        from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
        from aws_lambda_powertools.utilities.typing import LambdaContext

        def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
            for record in event.records:
                # {"N": "123.45"} => Decimal("123.45")
                key: str = record.dynamodb.keys["id"]

    def records(self) -> Iterator[DynamoDBRecord]:
        for record in self["Records"]:
            yield DynamoDBRecord(record)


class DynamoDBRecord (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

A description of a unique event within a stream


data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class DynamoDBRecord(DictWrapper):
    """A description of a unique event within a stream"""

    def aws_region(self) -> Optional[str]:
        """The region in which the GetRecords request was received"""
        return self.get("awsRegion")

    def dynamodb(self) -> Optional[StreamRecord]:
        """The main body of the stream record, containing all the DynamoDB-specific dicts."""
        stream_record = self.get("dynamodb")
        return None if stream_record is None else StreamRecord(stream_record)

    def event_id(self) -> Optional[str]:
        """A globally unique identifier for the event that was recorded in this stream record."""
        return self.get("eventID")

    def event_name(self) -> Optional[DynamoDBRecordEventName]:
        """The type of data modification that was performed on the DynamoDB table"""
        item = self.get("eventName")
        return None if item is None else DynamoDBRecordEventName[item]

    def event_source(self) -> Optional[str]:
        """The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb."""
        return self.get("eventSource")

    def event_source_arn(self) -> Optional[str]:
        """The Amazon Resource Name (ARN) of the event source"""
        return self.get("eventSourceARN")

    def event_version(self) -> Optional[str]:
        """The version number of the stream record format."""
        return self.get("eventVersion")

    def user_identity(self) -> Optional[dict]:
        """Contains details about the type of identity that made the request"""
        return self.get("userIdentity")


  • DictWrapper

Instance variables

var aws_region : Optional[str]

The region in which the GetRecords request was received

Expand source code
def aws_region(self) -> Optional[str]:
    """The region in which the GetRecords request was received"""
    return self.get("awsRegion")
var dynamodb : Optional[StreamRecord]

The main body of the stream record, containing all the DynamoDB-specific dicts.

Expand source code
def dynamodb(self) -> Optional[StreamRecord]:
    """The main body of the stream record, containing all the DynamoDB-specific dicts."""
    stream_record = self.get("dynamodb")
    return None if stream_record is None else StreamRecord(stream_record)
var event_id : Optional[str]

A globally unique identifier for the event that was recorded in this stream record.

Expand source code
def event_id(self) -> Optional[str]:
    """A globally unique identifier for the event that was recorded in this stream record."""
    return self.get("eventID")
var event_name : Optional[DynamoDBRecordEventName]

The type of data modification that was performed on the DynamoDB table

Expand source code
def event_name(self) -> Optional[DynamoDBRecordEventName]:
    """The type of data modification that was performed on the DynamoDB table"""
    item = self.get("eventName")
    return None if item is None else DynamoDBRecordEventName[item]
var event_source : Optional[str]

The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb.

Expand source code
def event_source(self) -> Optional[str]:
    """The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb."""
    return self.get("eventSource")
var event_source_arn : Optional[str]

The Amazon Resource Name (ARN) of the event source

Expand source code
def event_source_arn(self) -> Optional[str]:
    """The Amazon Resource Name (ARN) of the event source"""
    return self.get("eventSourceARN")
var event_version : Optional[str]

The version number of the stream record format.

Expand source code
def event_version(self) -> Optional[str]:
    """The version number of the stream record format."""
    return self.get("eventVersion")
var user_identity : Optional[dict]

Contains details about the type of identity that made the request

Expand source code
def user_identity(self) -> Optional[dict]:
    """Contains details about the type of identity that made the request"""
    return self.get("userIdentity")

Inherited members

class DynamoDBRecordEventName (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class DynamoDBRecordEventName(Enum):
    INSERT = 0  # a new item was added to the table
    MODIFY = 1  # one or more of an existing item's attributes were modified
    REMOVE = 2  # the item was deleted from the table


  • enum.Enum

Class variables

class DynamoDBStreamEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Dynamo DB Stream Event



Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.

from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
    for record in event.records:
        # {"N": "123.45"} => Decimal("123.45")
        key: str = record.dynamodb.keys["id"]


data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class DynamoDBStreamEvent(DictWrapper):
    """Dynamo DB Stream Event


    **Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.**

        from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
        from aws_lambda_powertools.utilities.typing import LambdaContext

        def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
            for record in event.records:
                # {"N": "123.45"} => Decimal("123.45")
                key: str = record.dynamodb.keys["id"]

    def records(self) -> Iterator[DynamoDBRecord]:
        for record in self["Records"]:
            yield DynamoDBRecord(record)


  • DictWrapper

Instance variables

var records : Iterator[DynamoDBRecord]
Expand source code
def records(self) -> Iterator[DynamoDBRecord]:
    for record in self["Records"]:
        yield DynamoDBRecord(record)

Inherited members

class StreamRecord (data: Dict[str, Any])

Provides a single read only access to a wrapper dict

StreamRecord constructor Parameters

data : Dict[str, Any]
Represents the dynamodb dict inside DynamoDBStreamEvent's records
Expand source code
class StreamRecord(DictWrapper):
    _deserializer = TypeDeserializer()

    def __init__(self, data: Dict[str, Any]):
        """StreamRecord constructor
        data: Dict[str, Any]
            Represents the dynamodb dict inside DynamoDBStreamEvent's records
        self._deserializer = TypeDeserializer()

    def _deserialize_dynamodb_dict(self, key: str) -> Optional[Dict[str, Any]]:
        """Deserialize DynamoDB records available in `Keys`, `NewImage`, and `OldImage`

        key : str
            DynamoDB key (e.g., Keys, NewImage, or OldImage)

        Optional[Dict[str, Any]]
            Deserialized records in Python native types
        dynamodb_dict = self._data.get(key)
        if dynamodb_dict is None:
            return None

        return {k: self._deserializer.deserialize(v) for k, v in dynamodb_dict.items()}

    def approximate_creation_date_time(self) -> Optional[int]:
        """The approximate date and time when the stream record was created, in UNIX epoch time format."""
        item = self.get("ApproximateCreationDateTime")
        return None if item is None else int(item)

    def keys(self) -> Optional[Dict[str, Any]]:  # type: ignore[override]
        """The primary key attribute(s) for the DynamoDB item that was modified."""
        return self._deserialize_dynamodb_dict("Keys")

    def new_image(self) -> Optional[Dict[str, Any]]:
        """The item in the DynamoDB table as it appeared after it was modified."""
        return self._deserialize_dynamodb_dict("NewImage")

    def old_image(self) -> Optional[Dict[str, Any]]:
        """The item in the DynamoDB table as it appeared before it was modified."""
        return self._deserialize_dynamodb_dict("OldImage")

    def sequence_number(self) -> Optional[str]:
        """The sequence number of the stream record."""
        return self.get("SequenceNumber")

    def size_bytes(self) -> Optional[int]:
        """The size of the stream record, in bytes."""
        item = self.get("SizeBytes")
        return None if item is None else int(item)

    def stream_view_type(self) -> Optional[StreamViewType]:
        """The type of data from the modified DynamoDB item that was captured in this stream record"""
        item = self.get("StreamViewType")
        return None if item is None else StreamViewType[str(item)]


  • DictWrapper

Instance variables

var approximate_creation_date_time : Optional[int]

The approximate date and time when the stream record was created, in UNIX epoch time format.

Expand source code
def approximate_creation_date_time(self) -> Optional[int]:
    """The approximate date and time when the stream record was created, in UNIX epoch time format."""
    item = self.get("ApproximateCreationDateTime")
    return None if item is None else int(item)
var keys : Optional[Dict[str, Any]]

The primary key attribute(s) for the DynamoDB item that was modified.

Expand source code
def keys(self) -> Optional[Dict[str, Any]]:  # type: ignore[override]
    """The primary key attribute(s) for the DynamoDB item that was modified."""
    return self._deserialize_dynamodb_dict("Keys")
var new_image : Optional[Dict[str, Any]]

The item in the DynamoDB table as it appeared after it was modified.

Expand source code
def new_image(self) -> Optional[Dict[str, Any]]:
    """The item in the DynamoDB table as it appeared after it was modified."""
    return self._deserialize_dynamodb_dict("NewImage")
var old_image : Optional[Dict[str, Any]]

The item in the DynamoDB table as it appeared before it was modified.

Expand source code
def old_image(self) -> Optional[Dict[str, Any]]:
    """The item in the DynamoDB table as it appeared before it was modified."""
    return self._deserialize_dynamodb_dict("OldImage")
var sequence_number : Optional[str]

The sequence number of the stream record.

Expand source code
def sequence_number(self) -> Optional[str]:
    """The sequence number of the stream record."""
    return self.get("SequenceNumber")
var size_bytes : Optional[int]

The size of the stream record, in bytes.

Expand source code
def size_bytes(self) -> Optional[int]:
    """The size of the stream record, in bytes."""
    item = self.get("SizeBytes")
    return None if item is None else int(item)
var stream_view_type : Optional[StreamViewType]

The type of data from the modified DynamoDB item that was captured in this stream record

Expand source code
def stream_view_type(self) -> Optional[StreamViewType]:
    """The type of data from the modified DynamoDB item that was captured in this stream record"""
    item = self.get("StreamViewType")
    return None if item is None else StreamViewType[str(item)]

Inherited members

class StreamViewType (value, names=None, *, module=None, qualname=None, type=None, start=1)

The type of data from the modified DynamoDB item that was captured in this stream record

Expand source code
class StreamViewType(Enum):
    """The type of data from the modified DynamoDB item that was captured in this stream record"""

    KEYS_ONLY = 0  # only the key attributes of the modified item
    NEW_IMAGE = 1  # the entire item, as it appeared after it was modified.
    OLD_IMAGE = 2  # the entire item, as it appeared before it was modified.
    NEW_AND_OLD_IMAGES = 3  # both the new and the old item images of the item.


  • enum.Enum

Class variables

class TypeDeserializer

Deserializes DynamoDB types to Python types.

It's based on boto3's DynamoDB TypeDeserializer.

The only notable difference is that for Binary (B, BS) values we return Python Bytes directly, since we don't support Python 2.

Expand source code
class TypeDeserializer:
    Deserializes DynamoDB types to Python types.

    It's based on boto3's [DynamoDB TypeDeserializer](

    The only notable difference is that for Binary (`B`, `BS`) values we return Python Bytes directly,
    since we don't support Python 2.

    def deserialize(self, value: Dict) -> Any:
        """Deserialize DynamoDB data types into Python types.

        value: Any
            DynamoDB value to be deserialized to a python type

            Here are the various conversions:

            DynamoDB                                Python
            --------                                ------
            {'NULL': True}                          None
            {'BOOL': True/False}                    True/False
            {'N': Decimal(value)}                   Decimal(value)
            {'S': string}                           string
            {'B': bytes}                            bytes
            {'NS': [str(value)]}                    set([str(value)])
            {'SS': [string]}                        set([string])
            {'BS': [bytes]}                         set([bytes])
            {'L': list}                             list
            {'M': dict}                             dict

        value: Any
            DynamoDB value to be deserialized to a python type

            Python native type converted from DynamoDB type

        dynamodb_type = list(value.keys())[0]
        deserializer: Optional[Callable] = getattr(self, f"_deserialize_{dynamodb_type}".lower(), None)
        if deserializer is None:
            raise TypeError(f"Dynamodb type {dynamodb_type} is not supported")

        return deserializer(value[dynamodb_type])

    def _deserialize_null(self, value: bool) -> None:
        return None

    def _deserialize_bool(self, value: bool) -> bool:
        return value

    def _deserialize_n(self, value: str) -> Decimal:
        return DYNAMODB_CONTEXT.create_decimal(value)

    def _deserialize_s(self, value: str) -> str:
        return value

    def _deserialize_b(self, value: bytes) -> bytes:
        return value

    def _deserialize_ns(self, value: Sequence[str]) -> Set[Decimal]:
        return set(map(self._deserialize_n, value))

    def _deserialize_ss(self, value: Sequence[str]) -> Set[str]:
        return set(map(self._deserialize_s, value))

    def _deserialize_bs(self, value: Sequence[bytes]) -> Set[bytes]:
        return set(map(self._deserialize_b, value))

    def _deserialize_l(self, value: Sequence[Dict]) -> Sequence[Any]:
        return [self.deserialize(v) for v in value]

    def _deserialize_m(self, value: Dict) -> Dict:
        return {k: self.deserialize(v) for k, v in value.items()}


def deserialize(self, value: Dict) ‑> Any

Deserialize DynamoDB data types into Python types.


value : Any

DynamoDB value to be deserialized to a python type

Here are the various conversions:

DynamoDB Python -------- ------ {'NULL': True} None {'BOOL': True/False} True/False {'N': Decimal(value)} Decimal(value) {'S': string} string {'B': bytes} bytes {'NS': [str(value)]} set([str(value)]) {'SS': [string]} set([string]) {'BS': [bytes]} set([bytes]) {'L': list} list {'M': dict} dict


value : Any
DynamoDB value to be deserialized to a python type


Python native type converted from DynamoDB type
Expand source code
def deserialize(self, value: Dict) -> Any:
    """Deserialize DynamoDB data types into Python types.

    value: Any
        DynamoDB value to be deserialized to a python type

        Here are the various conversions:

        DynamoDB                                Python
        --------                                ------
        {'NULL': True}                          None
        {'BOOL': True/False}                    True/False
        {'N': Decimal(value)}                   Decimal(value)
        {'S': string}                           string
        {'B': bytes}                            bytes
        {'NS': [str(value)]}                    set([str(value)])
        {'SS': [string]}                        set([string])
        {'BS': [bytes]}                         set([bytes])
        {'L': list}                             list
        {'M': dict}                             dict

    value: Any
        DynamoDB value to be deserialized to a python type

        Python native type converted from DynamoDB type

    dynamodb_type = list(value.keys())[0]
    deserializer: Optional[Callable] = getattr(self, f"_deserialize_{dynamodb_type}".lower(), None)
    if deserializer is None:
        raise TypeError(f"Dynamodb type {dynamodb_type} is not supported")

    return deserializer(value[dynamodb_type])