Module aws_lambda_powertools.utilities.idempotency

Utility for adding idempotency to lambda functions

Expand source code
"""
Utility for adding idempotency to lambda functions
"""

from aws_lambda_powertools.utilities.idempotency.persistence.base import BasePersistenceLayer
from aws_lambda_powertools.utilities.idempotency.persistence.dynamodb import DynamoDBPersistenceLayer

from .idempotency import IdempotencyConfig, idempotent

__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent", "IdempotencyConfig")

Sub-modules

aws_lambda_powertools.utilities.idempotency.config
aws_lambda_powertools.utilities.idempotency.exceptions

Idempotency errors

aws_lambda_powertools.utilities.idempotency.idempotency

Primary interface for idempotent Lambda functions utility

aws_lambda_powertools.utilities.idempotency.persistence

Functions

def idempotent(handler: Callable[[Any, aws_lambda_powertools.utilities.lambda_context.LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, persistence_store: BasePersistenceLayer, config: IdempotencyConfig = None) ‑> Any

Middleware to handle idempotency

Parameters

handler : Callable
Lambda's handler
event : Dict
Lambda's Event
context : Dict
Lambda's Context
persistence_store : BasePersistenceLayer
Instance of BasePersistenceLayer to store data
config : IdempotencyConfig
Configuration

Examples

Processes Lambda's event in an idempotent manner

>>> from aws_lambda_powertools.utilities.idempotency import (
>>>    idempotent, DynamoDBPersistenceLayer, IdempotencyConfig
>>> )
>>>
>>> idem_config=IdempotencyConfig(event_key_jmespath="body")
>>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")
>>>
>>> @idempotent(config=idem_config, persistence_store=persistence_layer)
>>> def handler(event, context):
>>>     return {"StatusCode": 200}
Expand source code
@lambda_handler_decorator
def idempotent(
    handler: Callable[[Any, LambdaContext], Any],
    event: Dict[str, Any],
    context: LambdaContext,
    persistence_store: BasePersistenceLayer,
    config: IdempotencyConfig = None,
) -> Any:
    """
    Middleware to handle idempotency

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: Dict
        Lambda's Event
    context: Dict
        Lambda's Context
    persistence_store: BasePersistenceLayer
        Instance of BasePersistenceLayer to store data
    config: IdempotencyConfig
        Configuration

    Examples
    --------
    **Processes Lambda's event in an idempotent manner**

        >>> from aws_lambda_powertools.utilities.idempotency import (
        >>>    idempotent, DynamoDBPersistenceLayer, IdempotencyConfig
        >>> )
        >>>
        >>> idem_config=IdempotencyConfig(event_key_jmespath="body")
        >>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")
        >>>
        >>> @idempotent(config=idem_config, persistence_store=persistence_layer)
        >>> def handler(event, context):
        >>>     return {"StatusCode": 200}
    """

    config = config or IdempotencyConfig()
    idempotency_handler = IdempotencyHandler(
        lambda_handler=handler, event=event, context=context, persistence_store=persistence_store, config=config
    )

    # IdempotencyInconsistentStateError can happen under rare but expected cases when persistent state changes in the
    # small time between put & get requests. In most cases we can retry successfully on this exception.
    # Maintenance: Allow customers to specify number of retries
    max_handler_retries = 2
    for i in range(max_handler_retries + 1):
        try:
            return idempotency_handler.handle()
        except IdempotencyInconsistentStateError:
            if i < max_handler_retries:
                continue
            else:
                # Allow the exception to bubble up after max retries exceeded
                raise

Classes

class BasePersistenceLayer

Abstract Base Class for Idempotency persistence layer.

Initialize the defaults

Expand source code
class BasePersistenceLayer(ABC):
    """
    Abstract Base Class for Idempotency persistence layer.
    """

    def __init__(self):
        """Initialize the defaults """
        self.configured = False
        self.event_key_jmespath: Optional[str] = None
        self.event_key_compiled_jmespath = None
        self.jmespath_options: Optional[dict] = None
        self.payload_validation_enabled = False
        self.validation_key_jmespath = None
        self.raise_on_no_idempotency_key = False
        self.expires_after_seconds: int = 60 * 60  # 1 hour default
        self.use_local_cache = False
        self._cache: Optional[LRUDict] = None
        self.hash_function = None

    def configure(self, config: IdempotencyConfig) -> None:
        """
        Initialize the base persistence layer from the configuration settings

        Parameters
        ----------
        config: IdempotencyConfig
            Idempotency configuration settings
        """
        if self.configured:
            # Prevent being reconfigured multiple times
            return
        self.configured = True

        self.event_key_jmespath = config.event_key_jmespath
        if config.event_key_jmespath:
            self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath)
        self.jmespath_options = config.jmespath_options
        if not self.jmespath_options:
            self.jmespath_options = {"custom_functions": PowertoolsFunctions()}
        if config.payload_validation_jmespath:
            self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath)
            self.payload_validation_enabled = True
        self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
        self.expires_after_seconds = config.expires_after_seconds
        self.use_local_cache = config.use_local_cache
        if self.use_local_cache:
            self._cache = LRUDict(max_items=config.local_cache_max_items)
        self.hash_function = getattr(hashlib, config.hash_function)

    def _get_hashed_idempotency_key(self, event: Dict[str, Any], context: LambdaContext) -> str:
        """
        Extract data from lambda event using event key jmespath, and return a hashed representation

        Parameters
        ----------
        event: Dict[str, Any]
            Lambda event
        context: LambdaContext
            Lambda context

        Returns
        -------
        str
            Hashed representation of the data extracted by the jmespath expression

        """
        data = event

        if self.event_key_jmespath:
            data = self.event_key_compiled_jmespath.search(event, options=jmespath.Options(**self.jmespath_options))

        if self.is_missing_idempotency_key(data):
            if self.raise_on_no_idempotency_key:
                raise IdempotencyKeyError("No data found to create a hashed idempotency_key")
            warnings.warn(f"No value found for idempotency_key. jmespath: {self.event_key_jmespath}")

        generated_hash = self._generate_hash(data)
        return f"{context.function_name}#{generated_hash}"

    @staticmethod
    def is_missing_idempotency_key(data) -> bool:
        if type(data).__name__ in ("tuple", "list", "dict"):
            return all(x is None for x in data)
        return not data

    def _get_hashed_payload(self, lambda_event: Dict[str, Any]) -> str:
        """
        Extract data from lambda event using validation key jmespath, and return a hashed representation

        Parameters
        ----------
        lambda_event: Dict[str, Any]
            Lambda event

        Returns
        -------
        str
            Hashed representation of the data extracted by the jmespath expression

        """
        if not self.payload_validation_enabled:
            return ""
        data = self.validation_key_jmespath.search(lambda_event)
        return self._generate_hash(data)

    def _generate_hash(self, data: Any) -> str:
        """
        Generate a hash value from the provided data

        Parameters
        ----------
        data: Any
            The data to hash

        Returns
        -------
        str
            Hashed representation of the provided data

        """
        hashed_data = self.hash_function(json.dumps(data, cls=Encoder).encode())
        return hashed_data.hexdigest()

    def _validate_payload(self, lambda_event: Dict[str, Any], data_record: DataRecord) -> None:
        """
        Validate that the hashed payload matches in the lambda event and stored data record

        Parameters
        ----------
        lambda_event: Dict[str, Any]
            Lambda event
        data_record: DataRecord
            DataRecord instance

        Raises
        ----------
        IdempotencyValidationError
            Event payload doesn't match the stored record for the given idempotency key

        """
        if self.payload_validation_enabled:
            lambda_payload_hash = self._get_hashed_payload(lambda_event)
            if data_record.payload_hash != lambda_payload_hash:
                raise IdempotencyValidationError("Payload does not match stored record for this event key")

    def _get_expiry_timestamp(self) -> int:
        """

        Returns
        -------
        int
            unix timestamp of expiry date for idempotency record

        """
        now = datetime.datetime.now()
        period = datetime.timedelta(seconds=self.expires_after_seconds)
        return int((now + period).timestamp())

    def _save_to_cache(self, data_record: DataRecord):
        """
        Save data_record to local cache except when status is "INPROGRESS"

        NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the
        execution environment

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance

        Returns
        -------

        """
        if not self.use_local_cache:
            return
        if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
            return
        self._cache[data_record.idempotency_key] = data_record

    def _retrieve_from_cache(self, idempotency_key: str):
        if not self.use_local_cache:
            return
        cached_record = self._cache.get(idempotency_key)
        if cached_record:
            if not cached_record.is_expired:
                return cached_record
            logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}")
            self._delete_from_cache(idempotency_key)

    def _delete_from_cache(self, idempotency_key: str):
        if not self.use_local_cache:
            return
        if idempotency_key in self._cache:
            del self._cache[idempotency_key]

    def save_success(self, event: Dict[str, Any], context: LambdaContext, result: dict) -> None:
        """
        Save record of function's execution completing successfully

        Parameters
        ----------
        event: Dict[str, Any]
            Lambda event
        context: LambdaContext
            Lambda context
        result: dict
            The response from lambda handler
        """
        response_data = json.dumps(result, cls=Encoder)

        data_record = DataRecord(
            idempotency_key=self._get_hashed_idempotency_key(event, context),
            status=STATUS_CONSTANTS["COMPLETED"],
            expiry_timestamp=self._get_expiry_timestamp(),
            response_data=response_data,
            payload_hash=self._get_hashed_payload(event),
        )
        logger.debug(
            f"Lambda successfully executed. Saving record to persistence store with "
            f"idempotency key: {data_record.idempotency_key}"
        )
        self._update_record(data_record=data_record)

        self._save_to_cache(data_record)

    def save_inprogress(self, event: Dict[str, Any], context: LambdaContext) -> None:
        """
        Save record of function's execution being in progress

        Parameters
        ----------
        event: Dict[str, Any]
            Lambda event
        context: LambdaContext
            Lambda context
        """
        data_record = DataRecord(
            idempotency_key=self._get_hashed_idempotency_key(event, context),
            status=STATUS_CONSTANTS["INPROGRESS"],
            expiry_timestamp=self._get_expiry_timestamp(),
            payload_hash=self._get_hashed_payload(event),
        )

        logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

        if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
            raise IdempotencyItemAlreadyExistsError

        self._put_record(data_record)

    def delete_record(self, event: Dict[str, Any], context: LambdaContext, exception: Exception):
        """
        Delete record from the persistence store

        Parameters
        ----------
        event: Dict[str, Any]
            Lambda event
        context: LambdaContext
            Lambda context
        exception
            The exception raised by the lambda handler
        """
        data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(event, context))

        logger.debug(
            f"Lambda raised an exception ({type(exception).__name__}). Clearing in progress record in persistence "
            f"store for idempotency key: {data_record.idempotency_key}"
        )
        self._delete_record(data_record)

        self._delete_from_cache(data_record.idempotency_key)

    def get_record(self, event: Dict[str, Any], context: LambdaContext) -> DataRecord:
        """
        Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key
        and return it as a DataRecord instance.and return it as a DataRecord instance.

        Parameters
        ----------
        event: Dict[str, Any]
            Lambda event
        context: LambdaContext
            Lambda context

        Returns
        -------
        DataRecord
            DataRecord representation of existing record found in persistence store

        Raises
        ------
        IdempotencyItemNotFoundError
            Exception raised if no record exists in persistence store with the idempotency key
        IdempotencyValidationError
            Event payload doesn't match the stored record for the given idempotency key
        """

        idempotency_key = self._get_hashed_idempotency_key(event, context)

        cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
        if cached_record:
            logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
            self._validate_payload(event, cached_record)
            return cached_record

        record = self._get_record(idempotency_key)

        self._save_to_cache(data_record=record)

        self._validate_payload(event, record)
        return record

    @abstractmethod
    def _get_record(self, idempotency_key) -> DataRecord:
        """
        Retrieve item from persistence store using idempotency key and return it as a DataRecord instance.

        Parameters
        ----------
        idempotency_key

        Returns
        -------
        DataRecord
            DataRecord representation of existing record found in persistence store

        Raises
        ------
        IdempotencyItemNotFoundError
            Exception raised if no record exists in persistence store with the idempotency key
        """
        raise NotImplementedError

    @abstractmethod
    def _put_record(self, data_record: DataRecord) -> None:
        """
        Add a DataRecord to persistence store if it does not already exist with that key. Raise ItemAlreadyExists
        if a non-expired entry already exists.

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

    @abstractmethod
    def _update_record(self, data_record: DataRecord) -> None:
        """
        Update item in persistence store

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

    @abstractmethod
    def _delete_record(self, data_record: DataRecord) -> None:
        """
        Remove item from persistence store
        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

Ancestors

  • abc.ABC

Subclasses

Static methods

def is_missing_idempotency_key(data) ‑> bool
Expand source code
@staticmethod
def is_missing_idempotency_key(data) -> bool:
    if type(data).__name__ in ("tuple", "list", "dict"):
        return all(x is None for x in data)
    return not data

Methods

def configure(self, config: IdempotencyConfig) ‑> NoneType

Initialize the base persistence layer from the configuration settings

Parameters

config : IdempotencyConfig
Idempotency configuration settings
Expand source code
def configure(self, config: IdempotencyConfig) -> None:
    """
    Initialize the base persistence layer from the configuration settings

    Parameters
    ----------
    config: IdempotencyConfig
        Idempotency configuration settings
    """
    if self.configured:
        # Prevent being reconfigured multiple times
        return
    self.configured = True

    self.event_key_jmespath = config.event_key_jmespath
    if config.event_key_jmespath:
        self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath)
    self.jmespath_options = config.jmespath_options
    if not self.jmespath_options:
        self.jmespath_options = {"custom_functions": PowertoolsFunctions()}
    if config.payload_validation_jmespath:
        self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath)
        self.payload_validation_enabled = True
    self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
    self.expires_after_seconds = config.expires_after_seconds
    self.use_local_cache = config.use_local_cache
    if self.use_local_cache:
        self._cache = LRUDict(max_items=config.local_cache_max_items)
    self.hash_function = getattr(hashlib, config.hash_function)
def delete_record(self, event: Dict[str, Any], context: LambdaContext, exception: Exception)

Delete record from the persistence store

Parameters

event : Dict[str, Any]
Lambda event
context : LambdaContext
Lambda context
exception
The exception raised by the lambda handler
Expand source code
def delete_record(self, event: Dict[str, Any], context: LambdaContext, exception: Exception):
    """
    Delete record from the persistence store

    Parameters
    ----------
    event: Dict[str, Any]
        Lambda event
    context: LambdaContext
        Lambda context
    exception
        The exception raised by the lambda handler
    """
    data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(event, context))

    logger.debug(
        f"Lambda raised an exception ({type(exception).__name__}). Clearing in progress record in persistence "
        f"store for idempotency key: {data_record.idempotency_key}"
    )
    self._delete_record(data_record)

    self._delete_from_cache(data_record.idempotency_key)
def get_record(self, event: Dict[str, Any], context: LambdaContext) ‑> DataRecord

Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key and return it as a DataRecord instance.and return it as a DataRecord instance.

Parameters

event : Dict[str, Any]
Lambda event
context : LambdaContext
Lambda context

Returns

DataRecord
DataRecord representation of existing record found in persistence store

Raises

IdempotencyItemNotFoundError
Exception raised if no record exists in persistence store with the idempotency key
IdempotencyValidationError
Event payload doesn't match the stored record for the given idempotency key
Expand source code
def get_record(self, event: Dict[str, Any], context: LambdaContext) -> DataRecord:
    """
    Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key
    and return it as a DataRecord instance.and return it as a DataRecord instance.

    Parameters
    ----------
    event: Dict[str, Any]
        Lambda event
    context: LambdaContext
        Lambda context

    Returns
    -------
    DataRecord
        DataRecord representation of existing record found in persistence store

    Raises
    ------
    IdempotencyItemNotFoundError
        Exception raised if no record exists in persistence store with the idempotency key
    IdempotencyValidationError
        Event payload doesn't match the stored record for the given idempotency key
    """

    idempotency_key = self._get_hashed_idempotency_key(event, context)

    cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
    if cached_record:
        logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
        self._validate_payload(event, cached_record)
        return cached_record

    record = self._get_record(idempotency_key)

    self._save_to_cache(data_record=record)

    self._validate_payload(event, record)
    return record
def save_inprogress(self, event: Dict[str, Any], context: LambdaContext) ‑> NoneType

Save record of function's execution being in progress

Parameters

event : Dict[str, Any]
Lambda event
context : LambdaContext
Lambda context
Expand source code
def save_inprogress(self, event: Dict[str, Any], context: LambdaContext) -> None:
    """
    Save record of function's execution being in progress

    Parameters
    ----------
    event: Dict[str, Any]
        Lambda event
    context: LambdaContext
        Lambda context
    """
    data_record = DataRecord(
        idempotency_key=self._get_hashed_idempotency_key(event, context),
        status=STATUS_CONSTANTS["INPROGRESS"],
        expiry_timestamp=self._get_expiry_timestamp(),
        payload_hash=self._get_hashed_payload(event),
    )

    logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

    if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
        raise IdempotencyItemAlreadyExistsError

    self._put_record(data_record)
def save_success(self, event: Dict[str, Any], context: LambdaContext, result: dict) ‑> NoneType

Save record of function's execution completing successfully

Parameters

event : Dict[str, Any]
Lambda event
context : LambdaContext
Lambda context
result : dict
The response from lambda handler
Expand source code
def save_success(self, event: Dict[str, Any], context: LambdaContext, result: dict) -> None:
    """
    Save record of function's execution completing successfully

    Parameters
    ----------
    event: Dict[str, Any]
        Lambda event
    context: LambdaContext
        Lambda context
    result: dict
        The response from lambda handler
    """
    response_data = json.dumps(result, cls=Encoder)

    data_record = DataRecord(
        idempotency_key=self._get_hashed_idempotency_key(event, context),
        status=STATUS_CONSTANTS["COMPLETED"],
        expiry_timestamp=self._get_expiry_timestamp(),
        response_data=response_data,
        payload_hash=self._get_hashed_payload(event),
    )
    logger.debug(
        f"Lambda successfully executed. Saving record to persistence store with "
        f"idempotency key: {data_record.idempotency_key}"
    )
    self._update_record(data_record=data_record)

    self._save_to_cache(data_record)
class DynamoDBPersistenceLayer (table_name: str, key_attr: str = 'id', expiry_attr: str = 'expiration', status_attr: str = 'status', data_attr: str = 'data', validation_key_attr: str = 'validation', boto_config: Union[botocore.config.Config, NoneType] = None, boto3_session: Union[boto3.session.Session, NoneType] = None)

Abstract Base Class for Idempotency persistence layer.

Initialize the DynamoDB client

Parameters

table_name : str
Name of the table to use for storing execution records
key_attr : str, optional
DynamoDB attribute name for key, by default "id"
expiry_attr : str, optional
DynamoDB attribute name for expiry timestamp, by default "expiration"
status_attr : str, optional
DynamoDB attribute name for status, by default "status"
data_attr : str, optional
DynamoDB attribute name for response data, by default "data"
boto_config : botocore.config.Config, optional
Botocore configuration to pass during client initialization
boto3_session : boto3.session.Session, optional
Boto3 session to use for AWS API communication

Examples

Create a DynamoDB persistence layer with custom settings

>>> from aws_lambda_powertools.utilities.idempotency import (
>>>    idempotent, DynamoDBPersistenceLayer
>>> )
>>>
>>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store")
>>>
>>> @idempotent(persistence_store=persistence_store)
>>> def handler(event, context):
>>>     return {"StatusCode": 200}
Expand source code
class DynamoDBPersistenceLayer(BasePersistenceLayer):
    def __init__(
        self,
        table_name: str,
        key_attr: str = "id",
        expiry_attr: str = "expiration",
        status_attr: str = "status",
        data_attr: str = "data",
        validation_key_attr: str = "validation",
        boto_config: Optional[Config] = None,
        boto3_session: Optional[boto3.session.Session] = None,
    ):
        """
        Initialize the DynamoDB client

        Parameters
        ----------
        table_name: str
            Name of the table to use for storing execution records
        key_attr: str, optional
            DynamoDB attribute name for key, by default "id"
        expiry_attr: str, optional
            DynamoDB attribute name for expiry timestamp, by default "expiration"
        status_attr: str, optional
            DynamoDB attribute name for status, by default "status"
        data_attr: str, optional
            DynamoDB attribute name for response data, by default "data"
        boto_config: botocore.config.Config, optional
            Botocore configuration to pass during client initialization
        boto3_session : boto3.session.Session, optional
            Boto3 session to use for AWS API communication

        Examples
        --------
        **Create a DynamoDB persistence layer with custom settings**

            >>> from aws_lambda_powertools.utilities.idempotency import (
            >>>    idempotent, DynamoDBPersistenceLayer
            >>> )
            >>>
            >>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store")
            >>>
            >>> @idempotent(persistence_store=persistence_store)
            >>> def handler(event, context):
            >>>     return {"StatusCode": 200}
        """

        boto_config = boto_config or Config()
        session = boto3_session or boto3.session.Session()
        self._ddb_resource = session.resource("dynamodb", config=boto_config)
        self.table_name = table_name
        self.table = self._ddb_resource.Table(self.table_name)
        self.key_attr = key_attr
        self.expiry_attr = expiry_attr
        self.status_attr = status_attr
        self.data_attr = data_attr
        self.validation_key_attr = validation_key_attr
        super(DynamoDBPersistenceLayer, self).__init__()

    def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
        """
        Translate raw item records from DynamoDB to DataRecord

        Parameters
        ----------
        item: Dict[str, Union[str, int]]
            Item format from dynamodb response

        Returns
        -------
        DataRecord
            representation of item

        """
        return DataRecord(
            idempotency_key=item[self.key_attr],
            status=item[self.status_attr],
            expiry_timestamp=item[self.expiry_attr],
            response_data=item.get(self.data_attr),
            payload_hash=item.get(self.validation_key_attr),
        )

    def _get_record(self, idempotency_key) -> DataRecord:
        response = self.table.get_item(Key={self.key_attr: idempotency_key}, ConsistentRead=True)

        try:
            item = response["Item"]
        except KeyError:
            raise IdempotencyItemNotFoundError
        return self._item_to_data_record(item)

    def _put_record(self, data_record: DataRecord) -> None:
        item = {
            self.key_attr: data_record.idempotency_key,
            self.expiry_attr: data_record.expiry_timestamp,
            self.status_attr: data_record.status,
        }

        if self.payload_validation_enabled:
            item[self.validation_key_attr] = data_record.payload_hash

        now = datetime.datetime.now()
        try:
            logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")
            self.table.put_item(
                Item=item,
                ConditionExpression=f"attribute_not_exists({self.key_attr}) OR {self.expiry_attr} < :now",
                ExpressionAttributeValues={":now": int(now.timestamp())},
            )
        except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException:
            logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
            raise IdempotencyItemAlreadyExistsError

    def _update_record(self, data_record: DataRecord):
        logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
        update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
        expression_attr_values = {
            ":expiry": data_record.expiry_timestamp,
            ":response_data": data_record.response_data,
            ":status": data_record.status,
        }
        expression_attr_names = {
            "#response_data": self.data_attr,
            "#expiry": self.expiry_attr,
            "#status": self.status_attr,
        }

        if self.payload_validation_enabled:
            update_expression += ", #validation_key = :validation_key"
            expression_attr_values[":validation_key"] = data_record.payload_hash
            expression_attr_names["#validation_key"] = self.validation_key_attr

        kwargs = {
            "Key": {self.key_attr: data_record.idempotency_key},
            "UpdateExpression": update_expression,
            "ExpressionAttributeValues": expression_attr_values,
            "ExpressionAttributeNames": expression_attr_names,
        }

        self.table.update_item(**kwargs)

    def _delete_record(self, data_record: DataRecord) -> None:
        logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}")
        self.table.delete_item(Key={self.key_attr: data_record.idempotency_key})

Ancestors

Inherited members

class IdempotencyConfig (event_key_jmespath: str = '', payload_validation_jmespath: str = '', jmespath_options: Dict = None, raise_on_no_idempotency_key: bool = False, expires_after_seconds: int = 3600, use_local_cache: bool = False, local_cache_max_items: int = 256, hash_function: str = 'md5')

Initialize the base persistence layer

Parameters

event_key_jmespath : str
A jmespath expression to extract the idempotency key from the event record
payload_validation_jmespath : str
A jmespath expression to extract the payload to be validated from the event record
raise_on_no_idempotency_key : bool, optional
Raise exception if no idempotency key was found in the request, by default False
expires_after_seconds : int
The number of seconds to wait before a record is expired
use_local_cache : bool, optional
Whether to locally cache idempotency results, by default False
local_cache_max_items : int, optional
Max number of items to store in local cache, by default 1024
hash_function : str, optional
Function to use for calculating hashes, by default md5.
Expand source code
class IdempotencyConfig:
    def __init__(
        self,
        event_key_jmespath: str = "",
        payload_validation_jmespath: str = "",
        jmespath_options: Dict = None,
        raise_on_no_idempotency_key: bool = False,
        expires_after_seconds: int = 60 * 60,  # 1 hour default
        use_local_cache: bool = False,
        local_cache_max_items: int = 256,
        hash_function: str = "md5",
    ):
        """
        Initialize the base persistence layer

        Parameters
        ----------
        event_key_jmespath: str
            A jmespath expression to extract the idempotency key from the event record
        payload_validation_jmespath: str
            A jmespath expression to extract the payload to be validated from the event record
        raise_on_no_idempotency_key: bool, optional
            Raise exception if no idempotency key was found in the request, by default False
        expires_after_seconds: int
            The number of seconds to wait before a record is expired
        use_local_cache: bool, optional
            Whether to locally cache idempotency results, by default False
        local_cache_max_items: int, optional
            Max number of items to store in local cache, by default 1024
        hash_function: str, optional
            Function to use for calculating hashes, by default md5.
        """
        self.event_key_jmespath = event_key_jmespath
        self.payload_validation_jmespath = payload_validation_jmespath
        self.jmespath_options = jmespath_options
        self.raise_on_no_idempotency_key = raise_on_no_idempotency_key
        self.expires_after_seconds = expires_after_seconds
        self.use_local_cache = use_local_cache
        self.local_cache_max_items = local_cache_max_items
        self.hash_function = hash_function