Module aws_lambda_powertools.utilities.idempotency

Utility for adding idempotency to lambda functions

Expand source code
"""
Utility for adding idempotency to lambda functions
"""

from aws_lambda_powertools.utilities.idempotency.persistence.base import (
    BasePersistenceLayer,
)
from aws_lambda_powertools.utilities.idempotency.persistence.dynamodb import (
    DynamoDBPersistenceLayer,
)

from .idempotency import IdempotencyConfig, idempotent, idempotent_function

__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent", "idempotent_function", "IdempotencyConfig")

Sub-modules

aws_lambda_powertools.utilities.idempotency.base
aws_lambda_powertools.utilities.idempotency.config
aws_lambda_powertools.utilities.idempotency.exceptions

Idempotency errors

aws_lambda_powertools.utilities.idempotency.idempotency

Primary interface for idempotent Lambda functions utility

aws_lambda_powertools.utilities.idempotency.persistence

Functions

def idempotent(handler: Callable[[Any, aws_lambda_powertools.utilities.lambda_context.LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, persistence_store: BasePersistenceLayer, config: Optional[IdempotencyConfig] = None, **kwargs) ‑> Any

Decorator to handle idempotency

Parameters

handler : Callable
Lambda's handler
event : Dict
Lambda's Event
context : Dict
Lambda's Context
persistence_store : BasePersistenceLayer
Instance of BasePersistenceLayer to store data
config : IdempotencyConfig
Configuration

Examples

Processes Lambda's event in an idempotent manner

>>> from aws_lambda_powertools.utilities.idempotency import (
>>>    idempotent, DynamoDBPersistenceLayer, IdempotencyConfig
>>> )
>>>
>>> idem_config=IdempotencyConfig(event_key_jmespath="body")
>>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")
>>>
>>> @idempotent(config=idem_config, persistence_store=persistence_layer)
>>> def handler(event, context):
>>>     return {"StatusCode": 200}
Expand source code
@lambda_handler_decorator
def idempotent(
    handler: Callable[[Any, LambdaContext], Any],
    event: Dict[str, Any],
    context: LambdaContext,
    persistence_store: BasePersistenceLayer,
    config: Optional[IdempotencyConfig] = None,
    **kwargs,
) -> Any:
    """
    Decorator to handle idempotency

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: Dict
        Lambda's Event
    context: Dict
        Lambda's Context
    persistence_store: BasePersistenceLayer
        Instance of BasePersistenceLayer to store data
    config: IdempotencyConfig
        Configuration

    Examples
    --------
    **Processes Lambda's event in an idempotent manner**

        >>> from aws_lambda_powertools.utilities.idempotency import (
        >>>    idempotent, DynamoDBPersistenceLayer, IdempotencyConfig
        >>> )
        >>>
        >>> idem_config=IdempotencyConfig(event_key_jmespath="body")
        >>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")
        >>>
        >>> @idempotent(config=idem_config, persistence_store=persistence_layer)
        >>> def handler(event, context):
        >>>     return {"StatusCode": 200}
    """

    if os.getenv(constants.IDEMPOTENCY_DISABLED_ENV):
        return handler(event, context)

    config = config or IdempotencyConfig()
    config.register_lambda_context(context)

    args = event, context
    idempotency_handler = IdempotencyHandler(
        function=handler,
        function_payload=event,
        config=config,
        persistence_store=persistence_store,
        function_args=args,
        function_kwargs=kwargs,
    )

    return idempotency_handler.handle()
def idempotent_function(function: Optional[~AnyCallableT] = None, *, data_keyword_argument: str, persistence_store: BasePersistenceLayer, config: Optional[IdempotencyConfig] = None) ‑> Any

Decorator to handle idempotency of any function

Parameters

function : Callable
Function to be decorated
data_keyword_argument : str
Keyword parameter name in function's signature that we should hash as idempotency key, e.g. "order"
persistence_store : BasePersistenceLayer
Instance of BasePersistenceLayer to store data
config : IdempotencyConfig
Configuration

Examples

Processes an order in an idempotent manner

from aws_lambda_powertools.utilities.idempotency import (
   idempotent_function, DynamoDBPersistenceLayer, IdempotencyConfig
)

idem_config=IdempotencyConfig(event_key_jmespath="order_id")
persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")

@idempotent_function(data_keyword_argument="order", config=idem_config, persistence_store=persistence_layer)
def process_order(customer_id: str, order: dict, **kwargs):
    return {"StatusCode": 200}
Expand source code
def idempotent_function(
    function: Optional[AnyCallableT] = None,
    *,
    data_keyword_argument: str,
    persistence_store: BasePersistenceLayer,
    config: Optional[IdempotencyConfig] = None,
) -> Any:
    """
    Decorator to handle idempotency of any function

    Parameters
    ----------
    function: Callable
        Function to be decorated
    data_keyword_argument: str
        Keyword parameter name in function's signature that we should hash as idempotency key, e.g. "order"
    persistence_store: BasePersistenceLayer
        Instance of BasePersistenceLayer to store data
    config: IdempotencyConfig
        Configuration

    Examples
    --------
    **Processes an order in an idempotent manner**

        from aws_lambda_powertools.utilities.idempotency import (
           idempotent_function, DynamoDBPersistenceLayer, IdempotencyConfig
        )

        idem_config=IdempotencyConfig(event_key_jmespath="order_id")
        persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")

        @idempotent_function(data_keyword_argument="order", config=idem_config, persistence_store=persistence_layer)
        def process_order(customer_id: str, order: dict, **kwargs):
            return {"StatusCode": 200}
    """

    if not function:
        return cast(
            AnyCallableT,
            functools.partial(
                idempotent_function,
                data_keyword_argument=data_keyword_argument,
                persistence_store=persistence_store,
                config=config,
            ),
        )

    config = config or IdempotencyConfig()

    @functools.wraps(function)
    def decorate(*args, **kwargs):
        if os.getenv(constants.IDEMPOTENCY_DISABLED_ENV):
            return function(*args, **kwargs)

        if data_keyword_argument not in kwargs:
            raise RuntimeError(
                f"Unable to extract '{data_keyword_argument}' from keyword arguments."
                f" Ensure this exists in your function's signature as well as the caller used it as a keyword argument"
            )

        payload = kwargs.get(data_keyword_argument)

        idempotency_handler = IdempotencyHandler(
            function=function,
            function_payload=payload,
            config=config,
            persistence_store=persistence_store,
            function_args=args,
            function_kwargs=kwargs,
        )

        return idempotency_handler.handle()

    return cast(AnyCallableT, decorate)

Classes

class BasePersistenceLayer

Abstract Base Class for Idempotency persistence layer.

Initialize the defaults

Expand source code
class BasePersistenceLayer(ABC):
    """
    Abstract Base Class for Idempotency persistence layer.
    """

    def __init__(self):
        """Initialize the defaults"""
        self.function_name = ""
        self.configured = False
        self.event_key_jmespath: Optional[str] = None
        self.event_key_compiled_jmespath = None
        self.jmespath_options: Optional[dict] = None
        self.payload_validation_enabled = False
        self.validation_key_jmespath = None
        self.raise_on_no_idempotency_key = False
        self.expires_after_seconds: int = 60 * 60  # 1 hour default
        self.use_local_cache = False
        self.hash_function = None

    def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) -> None:
        """
        Initialize the base persistence layer from the configuration settings

        Parameters
        ----------
        config: IdempotencyConfig
            Idempotency configuration settings
        function_name: str, Optional
            The name of the function being decorated
        """
        self.function_name = f"{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, 'test-func')}.{function_name or ''}"

        if self.configured:
            # Prevent being reconfigured multiple times
            return
        self.configured = True

        self.event_key_jmespath = config.event_key_jmespath
        if config.event_key_jmespath:
            self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath)
        self.jmespath_options = config.jmespath_options
        if not self.jmespath_options:
            self.jmespath_options = {"custom_functions": PowertoolsFunctions()}
        if config.payload_validation_jmespath:
            self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath)
            self.payload_validation_enabled = True
        self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
        self.expires_after_seconds = config.expires_after_seconds
        self.use_local_cache = config.use_local_cache
        if self.use_local_cache:
            self._cache = LRUDict(max_items=config.local_cache_max_items)
        self.hash_function = getattr(hashlib, config.hash_function)

    def _get_hashed_idempotency_key(self, data: Dict[str, Any]) -> str:
        """
        Extract idempotency key and return a hashed representation

        Parameters
        ----------
        data: Dict[str, Any]
            Incoming data

        Returns
        -------
        str
            Hashed representation of the data extracted by the jmespath expression

        """
        if self.event_key_jmespath:
            data = self.event_key_compiled_jmespath.search(data, options=jmespath.Options(**self.jmespath_options))

        if self.is_missing_idempotency_key(data=data):
            if self.raise_on_no_idempotency_key:
                raise IdempotencyKeyError("No data found to create a hashed idempotency_key")
            warnings.warn(f"No value found for idempotency_key. jmespath: {self.event_key_jmespath}")

        generated_hash = self._generate_hash(data=data)
        return f"{self.function_name}#{generated_hash}"

    @staticmethod
    def is_missing_idempotency_key(data) -> bool:
        if type(data).__name__ in ("tuple", "list", "dict"):
            return all(x is None for x in data)
        return not data

    def _get_hashed_payload(self, data: Dict[str, Any]) -> str:
        """
        Extract payload using validation key jmespath and return a hashed representation

        Parameters
        ----------
        data: Dict[str, Any]
            Payload

        Returns
        -------
        str
            Hashed representation of the data extracted by the jmespath expression

        """
        if not self.payload_validation_enabled:
            return ""
        data = self.validation_key_jmespath.search(data)
        return self._generate_hash(data=data)

    def _generate_hash(self, data: Any) -> str:
        """
        Generate a hash value from the provided data

        Parameters
        ----------
        data: Any
            The data to hash

        Returns
        -------
        str
            Hashed representation of the provided data

        """
        hashed_data = self.hash_function(json.dumps(data, cls=Encoder, sort_keys=True).encode())
        return hashed_data.hexdigest()

    def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> None:
        """
        Validate that the hashed payload matches data provided and stored data record

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        data_record: DataRecord
            DataRecord instance

        Raises
        ----------
        IdempotencyValidationError
            Payload doesn't match the stored record for the given idempotency key

        """
        if self.payload_validation_enabled:
            data_hash = self._get_hashed_payload(data=data)
            if data_record.payload_hash != data_hash:
                raise IdempotencyValidationError("Payload does not match stored record for this event key")

    def _get_expiry_timestamp(self) -> int:
        """

        Returns
        -------
        int
            unix timestamp of expiry date for idempotency record

        """
        now = datetime.datetime.now()
        period = datetime.timedelta(seconds=self.expires_after_seconds)
        return int((now + period).timestamp())

    def _save_to_cache(self, data_record: DataRecord):
        """
        Save data_record to local cache except when status is "INPROGRESS"

        NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the
        execution environment

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance

        Returns
        -------

        """
        if not self.use_local_cache:
            return
        if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
            return
        self._cache[data_record.idempotency_key] = data_record

    def _retrieve_from_cache(self, idempotency_key: str):
        if not self.use_local_cache:
            return
        cached_record = self._cache.get(key=idempotency_key)
        if cached_record:
            if not cached_record.is_expired:
                return cached_record
            logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}")
            self._delete_from_cache(idempotency_key=idempotency_key)

    def _delete_from_cache(self, idempotency_key: str):
        if not self.use_local_cache:
            return
        if idempotency_key in self._cache:
            del self._cache[idempotency_key]

    def save_success(self, data: Dict[str, Any], result: dict) -> None:
        """
        Save record of function's execution completing successfully

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        result: dict
            The response from function
        """
        response_data = json.dumps(result, cls=Encoder, sort_keys=True)

        data_record = DataRecord(
            idempotency_key=self._get_hashed_idempotency_key(data=data),
            status=STATUS_CONSTANTS["COMPLETED"],
            expiry_timestamp=self._get_expiry_timestamp(),
            response_data=response_data,
            payload_hash=self._get_hashed_payload(data=data),
        )
        logger.debug(
            f"Function successfully executed. Saving record to persistence store with "
            f"idempotency key: {data_record.idempotency_key}"
        )
        self._update_record(data_record=data_record)

        self._save_to_cache(data_record=data_record)

    def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
        """
        Save record of function's execution being in progress

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        remaining_time_in_millis: Optional[int]
            If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
        """
        data_record = DataRecord(
            idempotency_key=self._get_hashed_idempotency_key(data=data),
            status=STATUS_CONSTANTS["INPROGRESS"],
            expiry_timestamp=self._get_expiry_timestamp(),
            payload_hash=self._get_hashed_payload(data=data),
        )

        if remaining_time_in_millis:
            now = datetime.datetime.now()
            period = datetime.timedelta(milliseconds=remaining_time_in_millis)
            timestamp = (now + period).timestamp()

            data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
        else:
            warnings.warn(
                "Couldn't determine the remaining time left. "
                "Did you call register_lambda_context on IdempotencyConfig?"
            )

        logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

        if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
            raise IdempotencyItemAlreadyExistsError

        self._put_record(data_record=data_record)

    def delete_record(self, data: Dict[str, Any], exception: Exception):
        """
        Delete record from the persistence store

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        exception
            The exception raised by the function
        """
        data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(data=data))

        logger.debug(
            f"Function raised an exception ({type(exception).__name__}). Clearing in progress record in persistence "
            f"store for idempotency key: {data_record.idempotency_key}"
        )
        self._delete_record(data_record=data_record)

        self._delete_from_cache(idempotency_key=data_record.idempotency_key)

    def get_record(self, data: Dict[str, Any]) -> DataRecord:
        """
        Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

        Parameters
        ----------
        data: Dict[str, Any]
            Payload

        Returns
        -------
        DataRecord
            DataRecord representation of existing record found in persistence store

        Raises
        ------
        IdempotencyItemNotFoundError
            Exception raised if no record exists in persistence store with the idempotency key
        IdempotencyValidationError
            Payload doesn't match the stored record for the given idempotency key
        """

        idempotency_key = self._get_hashed_idempotency_key(data=data)

        cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
        if cached_record:
            logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
            self._validate_payload(data=data, data_record=cached_record)
            return cached_record

        record = self._get_record(idempotency_key=idempotency_key)

        self._save_to_cache(data_record=record)

        self._validate_payload(data=data, data_record=record)
        return record

    @abstractmethod
    def _get_record(self, idempotency_key) -> DataRecord:
        """
        Retrieve item from persistence store using idempotency key and return it as a DataRecord instance.

        Parameters
        ----------
        idempotency_key

        Returns
        -------
        DataRecord
            DataRecord representation of existing record found in persistence store

        Raises
        ------
        IdempotencyItemNotFoundError
            Exception raised if no record exists in persistence store with the idempotency key
        """
        raise NotImplementedError

    @abstractmethod
    def _put_record(self, data_record: DataRecord) -> None:
        """
        Add a DataRecord to persistence store if it does not already exist with that key. Raise ItemAlreadyExists
        if a non-expired entry already exists.

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

    @abstractmethod
    def _update_record(self, data_record: DataRecord) -> None:
        """
        Update item in persistence store

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

    @abstractmethod
    def _delete_record(self, data_record: DataRecord) -> None:
        """
        Remove item from persistence store
        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

Ancestors

  • abc.ABC

Subclasses

Static methods

def is_missing_idempotency_key(data) ‑> bool
Expand source code
@staticmethod
def is_missing_idempotency_key(data) -> bool:
    if type(data).__name__ in ("tuple", "list", "dict"):
        return all(x is None for x in data)
    return not data

Methods

def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) ‑> None

Initialize the base persistence layer from the configuration settings

Parameters

config : IdempotencyConfig
Idempotency configuration settings
function_name : str, Optional
The name of the function being decorated
Expand source code
def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) -> None:
    """
    Initialize the base persistence layer from the configuration settings

    Parameters
    ----------
    config: IdempotencyConfig
        Idempotency configuration settings
    function_name: str, Optional
        The name of the function being decorated
    """
    self.function_name = f"{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, 'test-func')}.{function_name or ''}"

    if self.configured:
        # Prevent being reconfigured multiple times
        return
    self.configured = True

    self.event_key_jmespath = config.event_key_jmespath
    if config.event_key_jmespath:
        self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath)
    self.jmespath_options = config.jmespath_options
    if not self.jmespath_options:
        self.jmespath_options = {"custom_functions": PowertoolsFunctions()}
    if config.payload_validation_jmespath:
        self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath)
        self.payload_validation_enabled = True
    self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
    self.expires_after_seconds = config.expires_after_seconds
    self.use_local_cache = config.use_local_cache
    if self.use_local_cache:
        self._cache = LRUDict(max_items=config.local_cache_max_items)
    self.hash_function = getattr(hashlib, config.hash_function)
def delete_record(self, data: Dict[str, Any], exception: Exception)

Delete record from the persistence store

Parameters

data : Dict[str, Any]
Payload
exception
The exception raised by the function
Expand source code
def delete_record(self, data: Dict[str, Any], exception: Exception):
    """
    Delete record from the persistence store

    Parameters
    ----------
    data: Dict[str, Any]
        Payload
    exception
        The exception raised by the function
    """
    data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(data=data))

    logger.debug(
        f"Function raised an exception ({type(exception).__name__}). Clearing in progress record in persistence "
        f"store for idempotency key: {data_record.idempotency_key}"
    )
    self._delete_record(data_record=data_record)

    self._delete_from_cache(idempotency_key=data_record.idempotency_key)
def get_record(self, data: Dict[str, Any]) ‑> DataRecord

Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

Parameters

data : Dict[str, Any]
Payload

Returns

DataRecord
DataRecord representation of existing record found in persistence store

Raises

IdempotencyItemNotFoundError
Exception raised if no record exists in persistence store with the idempotency key
IdempotencyValidationError
Payload doesn't match the stored record for the given idempotency key
Expand source code
def get_record(self, data: Dict[str, Any]) -> DataRecord:
    """
    Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

    Parameters
    ----------
    data: Dict[str, Any]
        Payload

    Returns
    -------
    DataRecord
        DataRecord representation of existing record found in persistence store

    Raises
    ------
    IdempotencyItemNotFoundError
        Exception raised if no record exists in persistence store with the idempotency key
    IdempotencyValidationError
        Payload doesn't match the stored record for the given idempotency key
    """

    idempotency_key = self._get_hashed_idempotency_key(data=data)

    cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
    if cached_record:
        logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
        self._validate_payload(data=data, data_record=cached_record)
        return cached_record

    record = self._get_record(idempotency_key=idempotency_key)

    self._save_to_cache(data_record=record)

    self._validate_payload(data=data, data_record=record)
    return record
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) ‑> None

Save record of function's execution being in progress

Parameters

data : Dict[str, Any]
Payload
remaining_time_in_millis : Optional[int]
If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
Expand source code
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
    """
    Save record of function's execution being in progress

    Parameters
    ----------
    data: Dict[str, Any]
        Payload
    remaining_time_in_millis: Optional[int]
        If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
    """
    data_record = DataRecord(
        idempotency_key=self._get_hashed_idempotency_key(data=data),
        status=STATUS_CONSTANTS["INPROGRESS"],
        expiry_timestamp=self._get_expiry_timestamp(),
        payload_hash=self._get_hashed_payload(data=data),
    )

    if remaining_time_in_millis:
        now = datetime.datetime.now()
        period = datetime.timedelta(milliseconds=remaining_time_in_millis)
        timestamp = (now + period).timestamp()

        data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
    else:
        warnings.warn(
            "Couldn't determine the remaining time left. "
            "Did you call register_lambda_context on IdempotencyConfig?"
        )

    logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

    if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
        raise IdempotencyItemAlreadyExistsError

    self._put_record(data_record=data_record)
def save_success(self, data: Dict[str, Any], result: dict) ‑> None

Save record of function's execution completing successfully

Parameters

data : Dict[str, Any]
Payload
result : dict
The response from function
Expand source code
def save_success(self, data: Dict[str, Any], result: dict) -> None:
    """
    Save record of function's execution completing successfully

    Parameters
    ----------
    data: Dict[str, Any]
        Payload
    result: dict
        The response from function
    """
    response_data = json.dumps(result, cls=Encoder, sort_keys=True)

    data_record = DataRecord(
        idempotency_key=self._get_hashed_idempotency_key(data=data),
        status=STATUS_CONSTANTS["COMPLETED"],
        expiry_timestamp=self._get_expiry_timestamp(),
        response_data=response_data,
        payload_hash=self._get_hashed_payload(data=data),
    )
    logger.debug(
        f"Function successfully executed. Saving record to persistence store with "
        f"idempotency key: {data_record.idempotency_key}"
    )
    self._update_record(data_record=data_record)

    self._save_to_cache(data_record=data_record)
class DynamoDBPersistenceLayer (table_name: str, key_attr: str = 'id', static_pk_value: Optional[str] = None, sort_key_attr: Optional[str] = None, expiry_attr: str = 'expiration', in_progress_expiry_attr: str = 'in_progress_expiration', status_attr: str = 'status', data_attr: str = 'data', validation_key_attr: str = 'validation', boto_config: Optional[botocore.config.Config] = None, boto3_session: Optional[boto3.session.Session] = None)

Abstract Base Class for Idempotency persistence layer.

Initialize the DynamoDB client

Parameters

table_name : str
Name of the table to use for storing execution records
key_attr : str, optional
DynamoDB attribute name for partition key, by default "id"
static_pk_value : str, optional
DynamoDB attribute value for partition key, by default "idempotency#". This will be used if the sort_key_attr is set.
sort_key_attr : str, optional
DynamoDB attribute name for the sort key
expiry_attr : str, optional
DynamoDB attribute name for expiry timestamp, by default "expiration"
in_progress_expiry_attr : str, optional
DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
status_attr : str, optional
DynamoDB attribute name for status, by default "status"
data_attr : str, optional
DynamoDB attribute name for response data, by default "data"
boto_config : botocore.config.Config, optional
Botocore configuration to pass during client initialization
boto3_session : boto3.session.Session, optional
Boto3 session to use for AWS API communication

Examples

Create a DynamoDB persistence layer with custom settings

>>> from aws_lambda_powertools.utilities.idempotency import (
>>>    idempotent, DynamoDBPersistenceLayer
>>> )
>>>
>>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store")
>>>
>>> @idempotent(persistence_store=persistence_store)
>>> def handler(event, context):
>>>     return {"StatusCode": 200}
Expand source code
class DynamoDBPersistenceLayer(BasePersistenceLayer):
    def __init__(
        self,
        table_name: str,
        key_attr: str = "id",
        static_pk_value: Optional[str] = None,
        sort_key_attr: Optional[str] = None,
        expiry_attr: str = "expiration",
        in_progress_expiry_attr: str = "in_progress_expiration",
        status_attr: str = "status",
        data_attr: str = "data",
        validation_key_attr: str = "validation",
        boto_config: Optional[Config] = None,
        boto3_session: Optional[boto3.session.Session] = None,
    ):
        """
        Initialize the DynamoDB client

        Parameters
        ----------
        table_name: str
            Name of the table to use for storing execution records
        key_attr: str, optional
            DynamoDB attribute name for partition key, by default "id"
        static_pk_value: str, optional
            DynamoDB attribute value for partition key, by default "idempotency#<function-name>".
            This will be used if the sort_key_attr is set.
        sort_key_attr: str, optional
            DynamoDB attribute name for the sort key
        expiry_attr: str, optional
            DynamoDB attribute name for expiry timestamp, by default "expiration"
        in_progress_expiry_attr: str, optional
            DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
        status_attr: str, optional
            DynamoDB attribute name for status, by default "status"
        data_attr: str, optional
            DynamoDB attribute name for response data, by default "data"
        boto_config: botocore.config.Config, optional
            Botocore configuration to pass during client initialization
        boto3_session : boto3.session.Session, optional
            Boto3 session to use for AWS API communication

        Examples
        --------
        **Create a DynamoDB persistence layer with custom settings**

            >>> from aws_lambda_powertools.utilities.idempotency import (
            >>>    idempotent, DynamoDBPersistenceLayer
            >>> )
            >>>
            >>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store")
            >>>
            >>> @idempotent(persistence_store=persistence_store)
            >>> def handler(event, context):
            >>>     return {"StatusCode": 200}
        """

        self._boto_config = boto_config or Config()
        self._boto3_session = boto3_session or boto3.session.Session()
        self._client = self._boto3_session.client("dynamodb", config=self._boto_config)

        if sort_key_attr == key_attr:
            raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!")

        if static_pk_value is None:
            static_pk_value = f"idempotency#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}"

        self.table_name = table_name
        self.key_attr = key_attr
        self.static_pk_value = static_pk_value
        self.sort_key_attr = sort_key_attr
        self.expiry_attr = expiry_attr
        self.in_progress_expiry_attr = in_progress_expiry_attr
        self.status_attr = status_attr
        self.data_attr = data_attr
        self.validation_key_attr = validation_key_attr

        self._deserializer = TypeDeserializer()

        super(DynamoDBPersistenceLayer, self).__init__()

    def _get_key(self, idempotency_key: str) -> dict:
        if self.sort_key_attr:
            return {self.key_attr: {"S": self.static_pk_value}, self.sort_key_attr: {"S": idempotency_key}}
        return {self.key_attr: {"S": idempotency_key}}

    def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
        """
        Translate raw item records from DynamoDB to DataRecord

        Parameters
        ----------
        item: Dict[str, Union[str, int]]
            Item format from dynamodb response

        Returns
        -------
        DataRecord
            representation of item

        """
        data = self._deserializer.deserialize({"M": item})
        return DataRecord(
            idempotency_key=data[self.key_attr],
            status=data[self.status_attr],
            expiry_timestamp=data[self.expiry_attr],
            in_progress_expiry_timestamp=data.get(self.in_progress_expiry_attr),
            response_data=data.get(self.data_attr),
            payload_hash=data.get(self.validation_key_attr),
        )

    def _get_record(self, idempotency_key) -> DataRecord:
        response = self._client.get_item(
            TableName=self.table_name, Key=self._get_key(idempotency_key), ConsistentRead=True
        )
        try:
            item = response["Item"]
        except KeyError as exc:
            raise IdempotencyItemNotFoundError from exc
        return self._item_to_data_record(item)

    def _put_record(self, data_record: DataRecord) -> None:
        item = {
            **self._get_key(data_record.idempotency_key),
            self.key_attr: {"S": data_record.idempotency_key},
            self.expiry_attr: {"N": str(data_record.expiry_timestamp)},
            self.status_attr: {"S": data_record.status},
        }

        if data_record.in_progress_expiry_timestamp is not None:
            item[self.in_progress_expiry_attr] = {"N": str(data_record.in_progress_expiry_timestamp)}

        if self.payload_validation_enabled and data_record.payload_hash:
            item[self.validation_key_attr] = {"S": data_record.payload_hash}

        now = datetime.datetime.now()
        try:
            logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")

            # |     LOCKED     |         RETRY if status = "INPROGRESS"                |     RETRY
            # |----------------|-------------------------------------------------------|-------------> .... (time)
            # |             Lambda                                              Idempotency Record
            # |             Timeout                                                 Timeout
            # |       (in_progress_expiry)                                          (expiry)

            # Conditions to successfully save a record:

            # The idempotency key does not exist:
            #    - first time that this invocation key is used
            #    - previous invocation with the same key was deleted due to TTL
            idempotency_key_not_exist = "attribute_not_exists(#id)"

            # The idempotency record exists but it's expired:
            idempotency_expiry_expired = "#expiry < :now"

            # The status of the record is "INPROGRESS", there is an in-progress expiry timestamp, but it's expired
            inprogress_expiry_expired = " AND ".join(
                [
                    "#status = :inprogress",
                    "attribute_exists(#in_progress_expiry)",
                    "#in_progress_expiry < :now_in_millis",
                ]
            )

            condition_expression = (
                f"{idempotency_key_not_exist} OR {idempotency_expiry_expired} OR ({inprogress_expiry_expired})"
            )
            self._client.put_item(
                TableName=self.table_name,
                Item=item,
                ConditionExpression=condition_expression,
                ExpressionAttributeNames={
                    "#id": self.key_attr,
                    "#expiry": self.expiry_attr,
                    "#in_progress_expiry": self.in_progress_expiry_attr,
                    "#status": self.status_attr,
                },
                ExpressionAttributeValues={
                    ":now": {"N": str(int(now.timestamp()))},
                    ":now_in_millis": {"N": str(int(now.timestamp() * 1000))},
                    ":inprogress": {"S": STATUS_CONSTANTS["INPROGRESS"]},
                },
            )
        except ClientError as exc:
            error_code = exc.response.get("Error", {}).get("Code")
            if error_code == "ConditionalCheckFailedException":
                logger.debug(
                    f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}"
                )
                raise IdempotencyItemAlreadyExistsError from exc
            else:
                raise

    def _update_record(self, data_record: DataRecord):
        logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
        update_expression = "SET #response_data = :response_data, #expiry = :expiry, " "#status = :status"
        expression_attr_values = {
            ":expiry": {"N": str(data_record.expiry_timestamp)},
            ":response_data": {"S": data_record.response_data},
            ":status": {"S": data_record.status},
        }
        expression_attr_names = {
            "#expiry": self.expiry_attr,
            "#response_data": self.data_attr,
            "#status": self.status_attr,
        }

        if self.payload_validation_enabled:
            update_expression += ", #validation_key = :validation_key"
            expression_attr_values[":validation_key"] = {"S": data_record.payload_hash}
            expression_attr_names["#validation_key"] = self.validation_key_attr

        kwargs = {
            "Key": self._get_key(data_record.idempotency_key),
            "UpdateExpression": update_expression,
            "ExpressionAttributeValues": expression_attr_values,
            "ExpressionAttributeNames": expression_attr_names,
        }

        self._client.update_item(TableName=self.table_name, **kwargs)

    def _delete_record(self, data_record: DataRecord) -> None:
        logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}")
        self._client.delete_item(TableName=self.table_name, Key={**self._get_key(data_record.idempotency_key)})

Ancestors

Inherited members

class IdempotencyConfig (event_key_jmespath: str = '', payload_validation_jmespath: str = '', jmespath_options: Optional[Dict[~KT, ~VT]] = None, raise_on_no_idempotency_key: bool = False, expires_after_seconds: int = 3600, use_local_cache: bool = False, local_cache_max_items: int = 256, hash_function: str = 'md5', lambda_context: Optional[LambdaContext] = None)

Initialize the base persistence layer

Parameters

event_key_jmespath : str
A jmespath expression to extract the idempotency key from the event record
payload_validation_jmespath : str
A jmespath expression to extract the payload to be validated from the event record
raise_on_no_idempotency_key : bool, optional
Raise exception if no idempotency key was found in the request, by default False
expires_after_seconds : int
The number of seconds to wait before a record is expired
use_local_cache : bool, optional
Whether to locally cache idempotency results, by default False
local_cache_max_items : int, optional
Max number of items to store in local cache, by default 1024
hash_function : str, optional
Function to use for calculating hashes, by default md5.
lambda_context : LambdaContext, optional
Lambda Context containing information about the invocation, function and execution environment.
Expand source code
class IdempotencyConfig:
    def __init__(
        self,
        event_key_jmespath: str = "",
        payload_validation_jmespath: str = "",
        jmespath_options: Optional[Dict] = None,
        raise_on_no_idempotency_key: bool = False,
        expires_after_seconds: int = 60 * 60,  # 1 hour default
        use_local_cache: bool = False,
        local_cache_max_items: int = 256,
        hash_function: str = "md5",
        lambda_context: Optional[LambdaContext] = None,
    ):
        """
        Initialize the base persistence layer

        Parameters
        ----------
        event_key_jmespath: str
            A jmespath expression to extract the idempotency key from the event record
        payload_validation_jmespath: str
            A jmespath expression to extract the payload to be validated from the event record
        raise_on_no_idempotency_key: bool, optional
            Raise exception if no idempotency key was found in the request, by default False
        expires_after_seconds: int
            The number of seconds to wait before a record is expired
        use_local_cache: bool, optional
            Whether to locally cache idempotency results, by default False
        local_cache_max_items: int, optional
            Max number of items to store in local cache, by default 1024
        hash_function: str, optional
            Function to use for calculating hashes, by default md5.
        lambda_context: LambdaContext, optional
            Lambda Context containing information about the invocation, function and execution environment.
        """
        self.event_key_jmespath = event_key_jmespath
        self.payload_validation_jmespath = payload_validation_jmespath
        self.jmespath_options = jmespath_options
        self.raise_on_no_idempotency_key = raise_on_no_idempotency_key
        self.expires_after_seconds = expires_after_seconds
        self.use_local_cache = use_local_cache
        self.local_cache_max_items = local_cache_max_items
        self.hash_function = hash_function
        self.lambda_context: Optional[LambdaContext] = lambda_context

    def register_lambda_context(self, lambda_context: LambdaContext):
        """Captures the Lambda context, to calculate the remaining time before the invocation times out"""
        self.lambda_context = lambda_context

Methods

def register_lambda_context(self, lambda_context: LambdaContext)

Captures the Lambda context, to calculate the remaining time before the invocation times out

Expand source code
def register_lambda_context(self, lambda_context: LambdaContext):
    """Captures the Lambda context, to calculate the remaining time before the invocation times out"""
    self.lambda_context = lambda_context