Module aws_lambda_powertools.utilities.idempotency

Utility for adding idempotency to lambda functions

Sub-modules

aws_lambda_powertools.utilities.idempotency.base
aws_lambda_powertools.utilities.idempotency.config
aws_lambda_powertools.utilities.idempotency.exceptions

Idempotency errors

aws_lambda_powertools.utilities.idempotency.hook
aws_lambda_powertools.utilities.idempotency.idempotency

Primary interface for idempotent Lambda functions utility

aws_lambda_powertools.utilities.idempotency.persistence
aws_lambda_powertools.utilities.idempotency.serialization

Functions

def idempotent(handler: Callable[[Any, LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, persistence_store: BasePersistenceLayer, config: Optional[IdempotencyConfig] = None, **kwargs) ‑> Any

Decorator to handle idempotency

Parameters

handler : Callable
Lambda's handler
event : Dict
Lambda's Event
context : Dict
Lambda's Context
persistence_store : BasePersistenceLayer
Instance of BasePersistenceLayer to store data
config : IdempotencyConfig
Configuration

Examples

Processes Lambda's event in an idempotent manner

>>> from aws_lambda_powertools.utilities.idempotency import (
>>>    idempotent, DynamoDBPersistenceLayer, IdempotencyConfig
>>> )
>>>
>>> idem_config=IdempotencyConfig(event_key_jmespath="body")
>>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")
>>>
>>> @idempotent(config=idem_config, persistence_store=persistence_layer)
>>> def handler(event, context):
>>>     return {"StatusCode": 200}
def idempotent_function(function: Optional[~AnyCallableT] = None, *, data_keyword_argument: str, persistence_store: BasePersistenceLayer, config: Optional[IdempotencyConfig] = None, output_serializer: Union[BaseIdempotencySerializer, Type[BaseIdempotencyModelSerializer], ForwardRef(None)] = None, **kwargs: Any) ‑> Any

Decorator to handle idempotency of any function

Parameters

function : Callable
Function to be decorated
data_keyword_argument : str
Keyword parameter name in function's signature that we should hash as idempotency key, e.g. "order"
persistence_store : BasePersistenceLayer
Instance of BasePersistenceLayer to store data
config : IdempotencyConfig
Configuration
output_serializer : Optional[Union[BaseIdempotencySerializer, Type[BaseIdempotencyModelSerializer]]]
Serializer to transform the data to and from a dictionary. If not supplied, no serialization is done via the NoOpSerializer. In case a serializer of type inheriting BaseIdempotencyModelSerializer is given, the serializer is derived from the function return type.

Examples

Processes an order in an idempotent manner

from aws_lambda_powertools.utilities.idempotency import (
   idempotent_function, DynamoDBPersistenceLayer, IdempotencyConfig
)

idem_config=IdempotencyConfig(event_key_jmespath="order_id")
persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")

@idempotent_function(data_keyword_argument="order", config=idem_config, persistence_store=persistence_layer)
def process_order(customer_id: str, order: dict, **kwargs):
    return {"StatusCode": 200}

Classes

class BasePersistenceLayer

Abstract Base Class for Idempotency persistence layer.

Initialize the defaults

Expand source code
class BasePersistenceLayer(ABC):
    """
    Abstract Base Class for Idempotency persistence layer.
    """

    def __init__(self):
        """Initialize the defaults"""
        self.function_name = ""
        self.configured = False
        self.event_key_jmespath: str = ""
        self.event_key_compiled_jmespath = None
        self.jmespath_options: Optional[dict] = None
        self.payload_validation_enabled = False
        self.validation_key_jmespath = None
        self.raise_on_no_idempotency_key = False
        self.expires_after_seconds: int = 60 * 60  # 1 hour default
        self.use_local_cache = False
        self.hash_function = hashlib.md5

    def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) -> None:
        """
        Initialize the base persistence layer from the configuration settings

        Parameters
        ----------
        config: IdempotencyConfig
            Idempotency configuration settings
        function_name: str, Optional
            The name of the function being decorated
        """
        self.function_name = f"{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, 'test-func')}.{function_name or ''}"

        if self.configured:
            # Prevent being reconfigured multiple times
            return
        self.configured = True

        self.event_key_jmespath = config.event_key_jmespath
        if config.event_key_jmespath:
            self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath)
        self.jmespath_options = config.jmespath_options
        if not self.jmespath_options:
            self.jmespath_options = {"custom_functions": PowertoolsFunctions()}
        if config.payload_validation_jmespath:
            self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath)
            self.payload_validation_enabled = True
        self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
        self.expires_after_seconds = config.expires_after_seconds
        self.use_local_cache = config.use_local_cache
        if self.use_local_cache:
            self._cache = LRUDict(max_items=config.local_cache_max_items)
        self.hash_function = getattr(hashlib, config.hash_function)

    def _get_hashed_idempotency_key(self, data: Dict[str, Any]) -> Optional[str]:
        """
        Extract idempotency key and return a hashed representation

        Parameters
        ----------
        data: Dict[str, Any]
            Incoming data

        Returns
        -------
        str
            Hashed representation of the data extracted by the jmespath expression

        """
        if self.event_key_jmespath:
            data = self.event_key_compiled_jmespath.search(data, options=jmespath.Options(**self.jmespath_options))

        if self.is_missing_idempotency_key(data=data):
            if self.raise_on_no_idempotency_key:
                raise IdempotencyKeyError("No data found to create a hashed idempotency_key")

            warnings.warn(
                f"No idempotency key value found. Skipping persistence layer and validation operations. jmespath: {self.event_key_jmespath}",  # noqa: E501
                stacklevel=2,
            )
            return None

        generated_hash = self._generate_hash(data=data)
        return f"{self.function_name}#{generated_hash}"

    @staticmethod
    def is_missing_idempotency_key(data) -> bool:
        if isinstance(data, (tuple, list, dict)):
            return all(x is None for x in data)
        elif isinstance(data, (int, float, bool)):
            return False
        return not data

    def _get_hashed_payload(self, data: Dict[str, Any]) -> str:
        """
        Extract payload using validation key jmespath and return a hashed representation

        Parameters
        ----------
        data: Dict[str, Any]
            Payload

        Returns
        -------
        str
            Hashed representation of the data extracted by the jmespath expression

        """
        if not self.payload_validation_enabled:
            return ""
        data = self.validation_key_jmespath.search(data)
        return self._generate_hash(data=data)

    def _generate_hash(self, data: Any) -> str:
        """
        Generate a hash value from the provided data

        Parameters
        ----------
        data: Any
            The data to hash

        Returns
        -------
        str
            Hashed representation of the provided data

        """
        hashed_data = self.hash_function(json.dumps(data, cls=Encoder, sort_keys=True).encode())
        return hashed_data.hexdigest()

    def _validate_payload(
        self,
        data_payload: Union[Dict[str, Any], DataRecord],
        stored_data_record: DataRecord,
    ) -> None:
        """
        Validate that the hashed payload matches data provided and stored data record

        Parameters
        ----------
        data_payload: Union[Dict[str, Any], DataRecord]
            Payload
        stored_data_record: DataRecord
            DataRecord fetched from Dynamo or cache

        Raises
        ----------
        IdempotencyValidationError
            Payload doesn't match the stored record for the given idempotency key

        """
        if self.payload_validation_enabled:
            if isinstance(data_payload, DataRecord):
                data_hash = data_payload.payload_hash
            else:
                data_hash = self._get_hashed_payload(data=data_payload)

            if stored_data_record.payload_hash != data_hash:
                raise IdempotencyValidationError("Payload does not match stored record for this event key")

    def _get_expiry_timestamp(self) -> int:
        """

        Returns
        -------
        int
            unix timestamp of expiry date for idempotency record

        """
        now = datetime.datetime.now()
        period = datetime.timedelta(seconds=self.expires_after_seconds)
        return int((now + period).timestamp())

    def _save_to_cache(self, data_record: DataRecord):
        """
        Save data_record to local cache except when status is "INPROGRESS"

        NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the
        execution environment

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance

        Returns
        -------

        """
        if not self.use_local_cache:
            return
        if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
            return
        self._cache[data_record.idempotency_key] = data_record

    def _retrieve_from_cache(self, idempotency_key: str):
        if not self.use_local_cache:
            return
        cached_record = self._cache.get(key=idempotency_key)
        if cached_record:
            if not cached_record.is_expired:
                return cached_record
            logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}")
            self._delete_from_cache(idempotency_key=idempotency_key)

    def _delete_from_cache(self, idempotency_key: str):
        if not self.use_local_cache:
            return
        if idempotency_key in self._cache:
            del self._cache[idempotency_key]

    def save_success(self, data: Dict[str, Any], result: dict) -> None:
        """
        Save record of function's execution completing successfully

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        result: dict
            The response from function
        """
        idempotency_key = self._get_hashed_idempotency_key(data=data)
        if idempotency_key is None:
            # If the idempotency key is None, no data will be saved in the Persistence Layer.
            # See: https://github.com/aws-powertools/powertools-lambda-python/issues/2465
            return None

        response_data = json.dumps(result, cls=Encoder, sort_keys=True)

        data_record = DataRecord(
            idempotency_key=idempotency_key,
            status=STATUS_CONSTANTS["COMPLETED"],
            expiry_timestamp=self._get_expiry_timestamp(),
            response_data=response_data,
            payload_hash=self._get_hashed_payload(data=data),
        )
        logger.debug(
            f"Function successfully executed. Saving record to persistence store with "
            f"idempotency key: {data_record.idempotency_key}",
        )
        self._update_record(data_record=data_record)

        self._save_to_cache(data_record=data_record)

    def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
        """
        Save record of function's execution being in progress

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        remaining_time_in_millis: Optional[int]
            If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
        """

        idempotency_key = self._get_hashed_idempotency_key(data=data)
        if idempotency_key is None:
            # If the idempotency key is None, no data will be saved in the Persistence Layer.
            # See: https://github.com/aws-powertools/powertools-lambda-python/issues/2465
            return None

        data_record = DataRecord(
            idempotency_key=idempotency_key,
            status=STATUS_CONSTANTS["INPROGRESS"],
            expiry_timestamp=self._get_expiry_timestamp(),
            payload_hash=self._get_hashed_payload(data=data),
        )

        if remaining_time_in_millis:
            now = datetime.datetime.now()
            period = datetime.timedelta(milliseconds=remaining_time_in_millis)
            timestamp = (now + period).timestamp()
            data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
        else:
            warnings.warn(
                "Couldn't determine the remaining time left. "
                "Did you call register_lambda_context on IdempotencyConfig?",
                stacklevel=2,
            )

        logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

        if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
            raise IdempotencyItemAlreadyExistsError

        self._put_record(data_record=data_record)

    def delete_record(self, data: Dict[str, Any], exception: Exception):
        """
        Delete record from the persistence store

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        exception
            The exception raised by the function
        """

        idempotency_key = self._get_hashed_idempotency_key(data=data)
        if idempotency_key is None:
            # If the idempotency key is None, no data will be saved in the Persistence Layer.
            # See: https://github.com/aws-powertools/powertools-lambda-python/issues/2465
            return None

        data_record = DataRecord(idempotency_key=idempotency_key)

        logger.debug(
            f"Function raised an exception ({type(exception).__name__}). Clearing in progress record in persistence "
            f"store for idempotency key: {data_record.idempotency_key}",
        )
        self._delete_record(data_record=data_record)

        self._delete_from_cache(idempotency_key=data_record.idempotency_key)

    def get_record(self, data: Dict[str, Any]) -> Optional[DataRecord]:
        """
        Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

        Parameters
        ----------
        data: Dict[str, Any]
            Payload

        Returns
        -------
        DataRecord
            DataRecord representation of existing record found in persistence store

        Raises
        ------
        IdempotencyItemNotFoundError
            Exception raised if no record exists in persistence store with the idempotency key
        IdempotencyValidationError
            Payload doesn't match the stored record for the given idempotency key
        """

        idempotency_key = self._get_hashed_idempotency_key(data=data)
        if idempotency_key is None:
            # If the idempotency key is None, no data will be saved in the Persistence Layer.
            # See: https://github.com/aws-powertools/powertools-lambda-python/issues/2465
            return None

        cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
        if cached_record:
            logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
            self._validate_payload(data_payload=data, stored_data_record=cached_record)
            return cached_record

        record = self._get_record(idempotency_key=idempotency_key)

        self._validate_payload(data_payload=data, stored_data_record=record)
        self._save_to_cache(data_record=record)

        return record

    @abstractmethod
    def _get_record(self, idempotency_key) -> DataRecord:
        """
        Retrieve item from persistence store using idempotency key and return it as a DataRecord instance.

        Parameters
        ----------
        idempotency_key

        Returns
        -------
        DataRecord
            DataRecord representation of existing record found in persistence store

        Raises
        ------
        IdempotencyItemNotFoundError
            Exception raised if no record exists in persistence store with the idempotency key
        """
        raise NotImplementedError

    @abstractmethod
    def _put_record(self, data_record: DataRecord) -> None:
        """
        Add a DataRecord to persistence store if it does not already exist with that key. Raise ItemAlreadyExists
        if a non-expired entry already exists.

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

    @abstractmethod
    def _update_record(self, data_record: DataRecord) -> None:
        """
        Update item in persistence store

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

    @abstractmethod
    def _delete_record(self, data_record: DataRecord) -> None:
        """
        Remove item from persistence store
        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

Ancestors

  • abc.ABC

Subclasses

Static methods

def is_missing_idempotency_key(data) ‑> bool

Methods

def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) ‑> None

Initialize the base persistence layer from the configuration settings

Parameters

config : IdempotencyConfig
Idempotency configuration settings
function_name : str, Optional
The name of the function being decorated
def delete_record(self, data: Dict[str, Any], exception: Exception)

Delete record from the persistence store

Parameters

data : Dict[str, Any]
Payload
exception
The exception raised by the function
def get_record(self, data: Dict[str, Any]) ‑> Optional[DataRecord]

Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

Parameters

data : Dict[str, Any]
Payload

Returns

DataRecord
DataRecord representation of existing record found in persistence store

Raises

IdempotencyItemNotFoundError
Exception raised if no record exists in persistence store with the idempotency key
IdempotencyValidationError
Payload doesn't match the stored record for the given idempotency key
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) ‑> None

Save record of function's execution being in progress

Parameters

data : Dict[str, Any]
Payload
remaining_time_in_millis : Optional[int]
If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
def save_success(self, data: Dict[str, Any], result: dict) ‑> None

Save record of function's execution completing successfully

Parameters

data : Dict[str, Any]
Payload
result : dict
The response from function
class DynamoDBPersistenceLayer (table_name: str, key_attr: str = 'id', static_pk_value: Optional[str] = None, sort_key_attr: Optional[str] = None, expiry_attr: str = 'expiration', in_progress_expiry_attr: str = 'in_progress_expiration', status_attr: str = 'status', data_attr: str = 'data', validation_key_attr: str = 'validation', boto_config: Optional[Config] = None, boto3_session: Optional[boto3.session.Session] = None, boto3_client: "'DynamoDBClient' | None" = None)

Abstract Base Class for Idempotency persistence layer.

Initialize the DynamoDB client

Parameters

table_name : str
Name of the table to use for storing execution records
key_attr : str, optional
DynamoDB attribute name for partition key, by default "id"
static_pk_value : str, optional
DynamoDB attribute value for partition key, by default "idempotency#". This will be used if the sort_key_attr is set.
sort_key_attr : str, optional
DynamoDB attribute name for the sort key
expiry_attr : str, optional
DynamoDB attribute name for expiry timestamp, by default "expiration"
in_progress_expiry_attr : str, optional
DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
status_attr : str, optional
DynamoDB attribute name for status, by default "status"
data_attr : str, optional
DynamoDB attribute name for response data, by default "data"
validation_key_attr : str, optional
DynamoDB attribute name for hashed representation of the parts of the event used for validation
boto_config : botocore.config.Config, optional
Botocore configuration to pass during client initialization
boto3_session : boto3.Session, optional
Boto3 session to use for AWS API communication
boto3_client : DynamoDBClient, optional
Boto3 DynamoDB Client to use, boto3_session and boto_config will be ignored if both are provided

Examples

Create a DynamoDB persistence layer with custom settings

>>> from aws_lambda_powertools.utilities.idempotency import (
>>>    idempotent, DynamoDBPersistenceLayer
>>> )
>>>
>>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store")
>>>
>>> @idempotent(persistence_store=persistence_store)
>>> def handler(event, context):
>>>     return {"StatusCode": 200}
Expand source code
class DynamoDBPersistenceLayer(BasePersistenceLayer):
    def __init__(
        self,
        table_name: str,
        key_attr: str = "id",
        static_pk_value: Optional[str] = None,
        sort_key_attr: Optional[str] = None,
        expiry_attr: str = "expiration",
        in_progress_expiry_attr: str = "in_progress_expiration",
        status_attr: str = "status",
        data_attr: str = "data",
        validation_key_attr: str = "validation",
        boto_config: Optional[Config] = None,
        boto3_session: Optional[boto3.session.Session] = None,
        boto3_client: "DynamoDBClient" | None = None,
    ):
        """
        Initialize the DynamoDB client

        Parameters
        ----------
        table_name: str
            Name of the table to use for storing execution records
        key_attr: str, optional
            DynamoDB attribute name for partition key, by default "id"
        static_pk_value: str, optional
            DynamoDB attribute value for partition key, by default "idempotency#<function-name>".
            This will be used if the sort_key_attr is set.
        sort_key_attr: str, optional
            DynamoDB attribute name for the sort key
        expiry_attr: str, optional
            DynamoDB attribute name for expiry timestamp, by default "expiration"
        in_progress_expiry_attr: str, optional
            DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
        status_attr: str, optional
            DynamoDB attribute name for status, by default "status"
        data_attr: str, optional
            DynamoDB attribute name for response data, by default "data"
        validation_key_attr: str, optional
            DynamoDB attribute name for hashed representation of the parts of the event used for validation
        boto_config: botocore.config.Config, optional
            Botocore configuration to pass during client initialization
        boto3_session : boto3.Session, optional
            Boto3 session to use for AWS API communication
        boto3_client : DynamoDBClient, optional
            Boto3 DynamoDB Client to use, boto3_session and boto_config will be ignored if both are provided

        Examples
        --------
        **Create a DynamoDB persistence layer with custom settings**

            >>> from aws_lambda_powertools.utilities.idempotency import (
            >>>    idempotent, DynamoDBPersistenceLayer
            >>> )
            >>>
            >>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store")
            >>>
            >>> @idempotent(persistence_store=persistence_store)
            >>> def handler(event, context):
            >>>     return {"StatusCode": 200}
        """
        if boto3_client is None:
            self._boto_config = boto_config or Config()
            self._boto3_session: boto3.Session = boto3_session or boto3.session.Session()
            self.client: "DynamoDBClient" = self._boto3_session.client("dynamodb", config=self._boto_config)
        else:
            self.client = boto3_client

        user_agent.register_feature_to_client(client=self.client, feature="idempotency")

        if sort_key_attr == key_attr:
            raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!")

        if static_pk_value is None:
            static_pk_value = f"idempotency#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}"

        self.table_name = table_name
        self.key_attr = key_attr
        self.static_pk_value = static_pk_value
        self.sort_key_attr = sort_key_attr
        self.expiry_attr = expiry_attr
        self.in_progress_expiry_attr = in_progress_expiry_attr
        self.status_attr = status_attr
        self.data_attr = data_attr
        self.validation_key_attr = validation_key_attr

        # Use DynamoDB's ReturnValuesOnConditionCheckFailure to optimize put and get operations and optimize costs.
        # This feature is supported in boto3 versions 1.26.164 and later.
        self.return_value_on_condition = (
            {"ReturnValuesOnConditionCheckFailure": "ALL_OLD"}
            if self.boto3_supports_condition_check_failure(boto3.__version__)
            else {}
        )

        self._deserializer = TypeDeserializer()

        super(DynamoDBPersistenceLayer, self).__init__()

    def _get_key(self, idempotency_key: str) -> dict:
        """Build primary key attribute simple or composite based on params.

        When sort_key_attr is set, we must return a composite key with static_pk_value,
        otherwise we use the idempotency key given.

        Parameters
        ----------
        idempotency_key : str
            idempotency key to use for simple primary key

        Returns
        -------
        dict
            simple or composite key for DynamoDB primary key
        """
        if self.sort_key_attr:
            return {self.key_attr: {"S": self.static_pk_value}, self.sort_key_attr: {"S": idempotency_key}}
        return {self.key_attr: {"S": idempotency_key}}

    def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
        """
        Translate raw item records from DynamoDB to DataRecord

        Parameters
        ----------
        item: Dict[str, Union[str, int]]
            Item format from dynamodb response

        Returns
        -------
        DataRecord
            representation of item

        """
        data = self._deserializer.deserialize({"M": item})
        return DataRecord(
            idempotency_key=data[self.key_attr],
            status=data[self.status_attr],
            expiry_timestamp=data[self.expiry_attr],
            in_progress_expiry_timestamp=data.get(self.in_progress_expiry_attr),
            response_data=data.get(self.data_attr),
            payload_hash=data.get(self.validation_key_attr),
        )

    def _get_record(self, idempotency_key) -> DataRecord:
        response = self.client.get_item(
            TableName=self.table_name,
            Key=self._get_key(idempotency_key),
            ConsistentRead=True,
        )
        try:
            item = response["Item"]
        except KeyError as exc:
            raise IdempotencyItemNotFoundError from exc
        return self._item_to_data_record(item)

    def _put_record(self, data_record: DataRecord) -> None:
        item = {
            # get simple or composite primary key
            **self._get_key(data_record.idempotency_key),
            self.expiry_attr: {"N": str(data_record.expiry_timestamp)},
            self.status_attr: {"S": data_record.status},
        }

        if data_record.in_progress_expiry_timestamp is not None:
            item[self.in_progress_expiry_attr] = {"N": str(data_record.in_progress_expiry_timestamp)}

        if self.payload_validation_enabled and data_record.payload_hash:
            item[self.validation_key_attr] = {"S": data_record.payload_hash}

        now = datetime.datetime.now()
        try:
            logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")

            # |     LOCKED     |         RETRY if status = "INPROGRESS"                |     RETRY
            # |----------------|-------------------------------------------------------|-------------> .... (time)
            # |             Lambda                                              Idempotency Record
            # |             Timeout                                                 Timeout
            # |       (in_progress_expiry)                                          (expiry)

            # Conditions to successfully save a record:

            # The idempotency key does not exist:
            #    - first time that this invocation key is used
            #    - previous invocation with the same key was deleted due to TTL
            idempotency_key_not_exist = "attribute_not_exists(#id)"

            # The idempotency record exists but it's expired:
            idempotency_expiry_expired = "#expiry < :now"

            # The status of the record is "INPROGRESS", there is an in-progress expiry timestamp, but it's expired
            inprogress_expiry_expired = " AND ".join(
                [
                    "#status = :inprogress",
                    "attribute_exists(#in_progress_expiry)",
                    "#in_progress_expiry < :now_in_millis",
                ],
            )

            condition_expression = (
                f"{idempotency_key_not_exist} OR {idempotency_expiry_expired} OR ({inprogress_expiry_expired})"
            )

            self.client.put_item(
                TableName=self.table_name,
                Item=item,
                ConditionExpression=condition_expression,
                ExpressionAttributeNames={
                    "#id": self.key_attr,
                    "#expiry": self.expiry_attr,
                    "#in_progress_expiry": self.in_progress_expiry_attr,
                    "#status": self.status_attr,
                },
                ExpressionAttributeValues={
                    ":now": {"N": str(int(now.timestamp()))},
                    ":now_in_millis": {"N": str(int(now.timestamp() * 1000))},
                    ":inprogress": {"S": STATUS_CONSTANTS["INPROGRESS"]},
                },
                **self.return_value_on_condition,  # type: ignore
            )
        except ClientError as exc:
            error_code = exc.response.get("Error", {}).get("Code")
            if error_code == "ConditionalCheckFailedException":
                old_data_record = self._item_to_data_record(exc.response["Item"]) if "Item" in exc.response else None
                if old_data_record is not None:
                    logger.debug(
                        f"Failed to put record for already existing idempotency key: "
                        f"{data_record.idempotency_key} with status: {old_data_record.status}, "
                        f"expiry_timestamp: {old_data_record.expiry_timestamp}, "
                        f"and in_progress_expiry_timestamp: {old_data_record.in_progress_expiry_timestamp}",
                    )

                    try:
                        self._validate_payload(data_payload=data_record, stored_data_record=old_data_record)
                        self._save_to_cache(data_record=old_data_record)
                    except IdempotencyValidationError as idempotency_validation_error:
                        raise idempotency_validation_error from exc

                    raise IdempotencyItemAlreadyExistsError(old_data_record=old_data_record) from exc

                logger.debug(
                    f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}",
                )
                raise IdempotencyItemAlreadyExistsError() from exc

            raise

    @staticmethod
    def boto3_supports_condition_check_failure(boto3_version: str) -> bool:
        """
        Check if the installed boto3 version supports condition check failure.

        Params
        ------
        boto3_version: str
            The boto3 version

        Returns
        -------
        bool
            True if the boto3 version supports condition check failure, False otherwise.
        """
        # Only supported in boto3 1.26.164 and above
        major, minor, *patch = map(int, boto3_version.split("."))
        return (major, minor, *patch) >= (1, 26, 164)

    def _update_record(self, data_record: DataRecord):
        logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
        update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
        expression_attr_values: Dict[str, "AttributeValueTypeDef"] = {
            ":expiry": {"N": str(data_record.expiry_timestamp)},
            ":response_data": {"S": data_record.response_data},
            ":status": {"S": data_record.status},
        }
        expression_attr_names = {
            "#expiry": self.expiry_attr,
            "#response_data": self.data_attr,
            "#status": self.status_attr,
        }

        if self.payload_validation_enabled:
            update_expression += ", #validation_key = :validation_key"
            expression_attr_values[":validation_key"] = {"S": data_record.payload_hash}
            expression_attr_names["#validation_key"] = self.validation_key_attr

        self.client.update_item(
            TableName=self.table_name,
            Key=self._get_key(data_record.idempotency_key),
            UpdateExpression=update_expression,
            ExpressionAttributeNames=expression_attr_names,
            ExpressionAttributeValues=expression_attr_values,
        )

    def _delete_record(self, data_record: DataRecord) -> None:
        logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}")
        self.client.delete_item(TableName=self.table_name, Key={**self._get_key(data_record.idempotency_key)})

Ancestors

Static methods

def boto3_supports_condition_check_failure(boto3_version: str) ‑> bool

Check if the installed boto3 version supports condition check failure.

Params

boto3_version: str The boto3 version

Returns

bool
True if the boto3 version supports condition check failure, False otherwise.

Inherited members

class IdempotencyConfig (event_key_jmespath: str = '', payload_validation_jmespath: str = '', jmespath_options: Optional[Dict] = None, raise_on_no_idempotency_key: bool = False, expires_after_seconds: int = 3600, use_local_cache: bool = False, local_cache_max_items: int = 256, hash_function: str = 'md5', lambda_context: Optional[LambdaContext] = None, response_hook: Optional[IdempotentHookFunction] = None)

Initialize the base persistence layer

Parameters

event_key_jmespath : str
A jmespath expression to extract the idempotency key from the event record
payload_validation_jmespath : str
A jmespath expression to extract the payload to be validated from the event record
raise_on_no_idempotency_key : bool, optional
Raise exception if no idempotency key was found in the request, by default False
expires_after_seconds : int
The number of seconds to wait before a record is expired
use_local_cache : bool, optional
Whether to locally cache idempotency results, by default False
local_cache_max_items : int, optional
Max number of items to store in local cache, by default 1024
hash_function : str, optional
Function to use for calculating hashes, by default md5.
lambda_context : LambdaContext, optional
Lambda Context containing information about the invocation, function and execution environment.
response_hook : IdempotentHookFunction, optional
Hook function to be called when an idempotent response is returned from the idempotent store.
Expand source code
class IdempotencyConfig:
    def __init__(
        self,
        event_key_jmespath: str = "",
        payload_validation_jmespath: str = "",
        jmespath_options: Optional[Dict] = None,
        raise_on_no_idempotency_key: bool = False,
        expires_after_seconds: int = 60 * 60,  # 1 hour default
        use_local_cache: bool = False,
        local_cache_max_items: int = 256,
        hash_function: str = "md5",
        lambda_context: Optional[LambdaContext] = None,
        response_hook: Optional[IdempotentHookFunction] = None,
    ):
        """
        Initialize the base persistence layer

        Parameters
        ----------
        event_key_jmespath: str
            A jmespath expression to extract the idempotency key from the event record
        payload_validation_jmespath: str
            A jmespath expression to extract the payload to be validated from the event record
        raise_on_no_idempotency_key: bool, optional
            Raise exception if no idempotency key was found in the request, by default False
        expires_after_seconds: int
            The number of seconds to wait before a record is expired
        use_local_cache: bool, optional
            Whether to locally cache idempotency results, by default False
        local_cache_max_items: int, optional
            Max number of items to store in local cache, by default 1024
        hash_function: str, optional
            Function to use for calculating hashes, by default md5.
        lambda_context: LambdaContext, optional
            Lambda Context containing information about the invocation, function and execution environment.
        response_hook: IdempotentHookFunction, optional
            Hook function to be called when an idempotent response is returned from the idempotent store.
        """
        self.event_key_jmespath = event_key_jmespath
        self.payload_validation_jmespath = payload_validation_jmespath
        self.jmespath_options = jmespath_options
        self.raise_on_no_idempotency_key = raise_on_no_idempotency_key
        self.expires_after_seconds = expires_after_seconds
        self.use_local_cache = use_local_cache
        self.local_cache_max_items = local_cache_max_items
        self.hash_function = hash_function
        self.lambda_context: Optional[LambdaContext] = lambda_context
        self.response_hook: Optional[IdempotentHookFunction] = response_hook

    def register_lambda_context(self, lambda_context: LambdaContext):
        """Captures the Lambda context, to calculate the remaining time before the invocation times out"""
        self.lambda_context = lambda_context

Methods

def register_lambda_context(self, lambda_context: LambdaContext)

Captures the Lambda context, to calculate the remaining time before the invocation times out

class IdempotentHookFunction (*args, **kwargs)

The IdempotentHookFunction. This class defines the calling signature for IdempotentHookFunction callbacks.

Expand source code
class IdempotentHookFunction(Protocol):
    """
    The IdempotentHookFunction.
    This class defines the calling signature for IdempotentHookFunction callbacks.
    """

    def __call__(self, response: Any, idempotent_data: DataRecord) -> Any: ...

Ancestors

  • typing.Protocol
  • typing.Generic