Module aws_lambda_powertools.utilities.idempotency.persistence.base

Persistence layers supporting idempotency

Expand source code
"""
Persistence layers supporting idempotency
"""
import datetime
import hashlib
import json
import logging
import os
import warnings
from abc import ABC, abstractmethod
from types import MappingProxyType
from typing import Any, Dict, Optional

import jmespath

from aws_lambda_powertools.shared import constants
from aws_lambda_powertools.shared.cache_dict import LRUDict
from aws_lambda_powertools.shared.json_encoder import Encoder
from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.exceptions import (
    IdempotencyInvalidStatusError,
    IdempotencyItemAlreadyExistsError,
    IdempotencyKeyError,
    IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.jmespath_utils import PowertoolsFunctions

logger = logging.getLogger(__name__)

STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})


class DataRecord:
    """
    Data Class for idempotency records.
    """

    def __init__(
        self,
        idempotency_key: str,
        status: str = "",
        expiry_timestamp: Optional[int] = None,
        in_progress_expiry_timestamp: Optional[int] = None,
        response_data: Optional[str] = "",
        payload_hash: Optional[str] = None,
    ) -> None:
        """

        Parameters
        ----------
        idempotency_key: str
            hashed representation of the idempotent data
        status: str, optional
            status of the idempotent record
        expiry_timestamp: int, optional
            time before the record should expire, in seconds
        in_progress_expiry_timestamp: int, optional
            time before the record should expire while in the INPROGRESS state, in seconds
        payload_hash: str, optional
            hashed representation of payload
        response_data: str, optional
            response data from previous executions using the record
        """
        self.idempotency_key = idempotency_key
        self.payload_hash = payload_hash
        self.expiry_timestamp = expiry_timestamp
        self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
        self._status = status
        self.response_data = response_data

    @property
    def is_expired(self) -> bool:
        """
        Check if data record is expired

        Returns
        -------
        bool
            Whether the record is currently expired or not
        """
        return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)

    @property
    def status(self) -> str:
        """
        Get status of data record

        Returns
        -------
        str
        """
        if self.is_expired:
            return STATUS_CONSTANTS["EXPIRED"]
        elif self._status in STATUS_CONSTANTS.values():
            return self._status
        else:
            raise IdempotencyInvalidStatusError(self._status)

    def response_json_as_dict(self) -> Optional[dict]:
        """
        Get response data deserialized to python dict

        Returns
        -------
        Optional[dict]
            previous response data deserialized
        """
        return json.loads(self.response_data) if self.response_data else None


class BasePersistenceLayer(ABC):
    """
    Abstract Base Class for Idempotency persistence layer.
    """

    def __init__(self):
        """Initialize the defaults"""
        self.function_name = ""
        self.configured = False
        self.event_key_jmespath: Optional[str] = None
        self.event_key_compiled_jmespath = None
        self.jmespath_options: Optional[dict] = None
        self.payload_validation_enabled = False
        self.validation_key_jmespath = None
        self.raise_on_no_idempotency_key = False
        self.expires_after_seconds: int = 60 * 60  # 1 hour default
        self.use_local_cache = False
        self.hash_function = None

    def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) -> None:
        """
        Initialize the base persistence layer from the configuration settings

        Parameters
        ----------
        config: IdempotencyConfig
            Idempotency configuration settings
        function_name: str, Optional
            The name of the function being decorated
        """
        self.function_name = f"{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, 'test-func')}.{function_name or ''}"

        if self.configured:
            # Prevent being reconfigured multiple times
            return
        self.configured = True

        self.event_key_jmespath = config.event_key_jmespath
        if config.event_key_jmespath:
            self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath)
        self.jmespath_options = config.jmespath_options
        if not self.jmespath_options:
            self.jmespath_options = {"custom_functions": PowertoolsFunctions()}
        if config.payload_validation_jmespath:
            self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath)
            self.payload_validation_enabled = True
        self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
        self.expires_after_seconds = config.expires_after_seconds
        self.use_local_cache = config.use_local_cache
        if self.use_local_cache:
            self._cache = LRUDict(max_items=config.local_cache_max_items)
        self.hash_function = getattr(hashlib, config.hash_function)

    def _get_hashed_idempotency_key(self, data: Dict[str, Any]) -> str:
        """
        Extract idempotency key and return a hashed representation

        Parameters
        ----------
        data: Dict[str, Any]
            Incoming data

        Returns
        -------
        str
            Hashed representation of the data extracted by the jmespath expression

        """
        if self.event_key_jmespath:
            data = self.event_key_compiled_jmespath.search(data, options=jmespath.Options(**self.jmespath_options))

        if self.is_missing_idempotency_key(data=data):
            if self.raise_on_no_idempotency_key:
                raise IdempotencyKeyError("No data found to create a hashed idempotency_key")
            warnings.warn(f"No value found for idempotency_key. jmespath: {self.event_key_jmespath}")

        generated_hash = self._generate_hash(data=data)
        return f"{self.function_name}#{generated_hash}"

    @staticmethod
    def is_missing_idempotency_key(data) -> bool:
        if type(data).__name__ in ("tuple", "list", "dict"):
            return all(x is None for x in data)
        return not data

    def _get_hashed_payload(self, data: Dict[str, Any]) -> str:
        """
        Extract payload using validation key jmespath and return a hashed representation

        Parameters
        ----------
        data: Dict[str, Any]
            Payload

        Returns
        -------
        str
            Hashed representation of the data extracted by the jmespath expression

        """
        if not self.payload_validation_enabled:
            return ""
        data = self.validation_key_jmespath.search(data)
        return self._generate_hash(data=data)

    def _generate_hash(self, data: Any) -> str:
        """
        Generate a hash value from the provided data

        Parameters
        ----------
        data: Any
            The data to hash

        Returns
        -------
        str
            Hashed representation of the provided data

        """
        hashed_data = self.hash_function(json.dumps(data, cls=Encoder, sort_keys=True).encode())
        return hashed_data.hexdigest()

    def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> None:
        """
        Validate that the hashed payload matches data provided and stored data record

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        data_record: DataRecord
            DataRecord instance

        Raises
        ----------
        IdempotencyValidationError
            Payload doesn't match the stored record for the given idempotency key

        """
        if self.payload_validation_enabled:
            data_hash = self._get_hashed_payload(data=data)
            if data_record.payload_hash != data_hash:
                raise IdempotencyValidationError("Payload does not match stored record for this event key")

    def _get_expiry_timestamp(self) -> int:
        """

        Returns
        -------
        int
            unix timestamp of expiry date for idempotency record

        """
        now = datetime.datetime.now()
        period = datetime.timedelta(seconds=self.expires_after_seconds)
        return int((now + period).timestamp())

    def _save_to_cache(self, data_record: DataRecord):
        """
        Save data_record to local cache except when status is "INPROGRESS"

        NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the
        execution environment

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance

        Returns
        -------

        """
        if not self.use_local_cache:
            return
        if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
            return
        self._cache[data_record.idempotency_key] = data_record

    def _retrieve_from_cache(self, idempotency_key: str):
        if not self.use_local_cache:
            return
        cached_record = self._cache.get(key=idempotency_key)
        if cached_record:
            if not cached_record.is_expired:
                return cached_record
            logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}")
            self._delete_from_cache(idempotency_key=idempotency_key)

    def _delete_from_cache(self, idempotency_key: str):
        if not self.use_local_cache:
            return
        if idempotency_key in self._cache:
            del self._cache[idempotency_key]

    def save_success(self, data: Dict[str, Any], result: dict) -> None:
        """
        Save record of function's execution completing successfully

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        result: dict
            The response from function
        """
        response_data = json.dumps(result, cls=Encoder, sort_keys=True)

        data_record = DataRecord(
            idempotency_key=self._get_hashed_idempotency_key(data=data),
            status=STATUS_CONSTANTS["COMPLETED"],
            expiry_timestamp=self._get_expiry_timestamp(),
            response_data=response_data,
            payload_hash=self._get_hashed_payload(data=data),
        )
        logger.debug(
            f"Function successfully executed. Saving record to persistence store with "
            f"idempotency key: {data_record.idempotency_key}"
        )
        self._update_record(data_record=data_record)

        self._save_to_cache(data_record=data_record)

    def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
        """
        Save record of function's execution being in progress

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        remaining_time_in_millis: Optional[int]
            If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
        """
        data_record = DataRecord(
            idempotency_key=self._get_hashed_idempotency_key(data=data),
            status=STATUS_CONSTANTS["INPROGRESS"],
            expiry_timestamp=self._get_expiry_timestamp(),
            payload_hash=self._get_hashed_payload(data=data),
        )

        if remaining_time_in_millis:
            now = datetime.datetime.now()
            period = datetime.timedelta(milliseconds=remaining_time_in_millis)
            timestamp = (now + period).timestamp()

            data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
        else:
            warnings.warn(
                "Couldn't determine the remaining time left. "
                "Did you call register_lambda_context on IdempotencyConfig?"
            )

        logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

        if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
            raise IdempotencyItemAlreadyExistsError

        self._put_record(data_record=data_record)

    def delete_record(self, data: Dict[str, Any], exception: Exception):
        """
        Delete record from the persistence store

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        exception
            The exception raised by the function
        """
        data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(data=data))

        logger.debug(
            f"Function raised an exception ({type(exception).__name__}). Clearing in progress record in persistence "
            f"store for idempotency key: {data_record.idempotency_key}"
        )
        self._delete_record(data_record=data_record)

        self._delete_from_cache(idempotency_key=data_record.idempotency_key)

    def get_record(self, data: Dict[str, Any]) -> DataRecord:
        """
        Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

        Parameters
        ----------
        data: Dict[str, Any]
            Payload

        Returns
        -------
        DataRecord
            DataRecord representation of existing record found in persistence store

        Raises
        ------
        IdempotencyItemNotFoundError
            Exception raised if no record exists in persistence store with the idempotency key
        IdempotencyValidationError
            Payload doesn't match the stored record for the given idempotency key
        """

        idempotency_key = self._get_hashed_idempotency_key(data=data)

        cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
        if cached_record:
            logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
            self._validate_payload(data=data, data_record=cached_record)
            return cached_record

        record = self._get_record(idempotency_key=idempotency_key)

        self._save_to_cache(data_record=record)

        self._validate_payload(data=data, data_record=record)
        return record

    @abstractmethod
    def _get_record(self, idempotency_key) -> DataRecord:
        """
        Retrieve item from persistence store using idempotency key and return it as a DataRecord instance.

        Parameters
        ----------
        idempotency_key

        Returns
        -------
        DataRecord
            DataRecord representation of existing record found in persistence store

        Raises
        ------
        IdempotencyItemNotFoundError
            Exception raised if no record exists in persistence store with the idempotency key
        """
        raise NotImplementedError

    @abstractmethod
    def _put_record(self, data_record: DataRecord) -> None:
        """
        Add a DataRecord to persistence store if it does not already exist with that key. Raise ItemAlreadyExists
        if a non-expired entry already exists.

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

    @abstractmethod
    def _update_record(self, data_record: DataRecord) -> None:
        """
        Update item in persistence store

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

    @abstractmethod
    def _delete_record(self, data_record: DataRecord) -> None:
        """
        Remove item from persistence store
        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

Classes

class BasePersistenceLayer

Abstract Base Class for Idempotency persistence layer.

Initialize the defaults

Expand source code
class BasePersistenceLayer(ABC):
    """
    Abstract Base Class for Idempotency persistence layer.
    """

    def __init__(self):
        """Initialize the defaults"""
        self.function_name = ""
        self.configured = False
        self.event_key_jmespath: Optional[str] = None
        self.event_key_compiled_jmespath = None
        self.jmespath_options: Optional[dict] = None
        self.payload_validation_enabled = False
        self.validation_key_jmespath = None
        self.raise_on_no_idempotency_key = False
        self.expires_after_seconds: int = 60 * 60  # 1 hour default
        self.use_local_cache = False
        self.hash_function = None

    def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) -> None:
        """
        Initialize the base persistence layer from the configuration settings

        Parameters
        ----------
        config: IdempotencyConfig
            Idempotency configuration settings
        function_name: str, Optional
            The name of the function being decorated
        """
        self.function_name = f"{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, 'test-func')}.{function_name or ''}"

        if self.configured:
            # Prevent being reconfigured multiple times
            return
        self.configured = True

        self.event_key_jmespath = config.event_key_jmespath
        if config.event_key_jmespath:
            self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath)
        self.jmespath_options = config.jmespath_options
        if not self.jmespath_options:
            self.jmespath_options = {"custom_functions": PowertoolsFunctions()}
        if config.payload_validation_jmespath:
            self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath)
            self.payload_validation_enabled = True
        self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
        self.expires_after_seconds = config.expires_after_seconds
        self.use_local_cache = config.use_local_cache
        if self.use_local_cache:
            self._cache = LRUDict(max_items=config.local_cache_max_items)
        self.hash_function = getattr(hashlib, config.hash_function)

    def _get_hashed_idempotency_key(self, data: Dict[str, Any]) -> str:
        """
        Extract idempotency key and return a hashed representation

        Parameters
        ----------
        data: Dict[str, Any]
            Incoming data

        Returns
        -------
        str
            Hashed representation of the data extracted by the jmespath expression

        """
        if self.event_key_jmespath:
            data = self.event_key_compiled_jmespath.search(data, options=jmespath.Options(**self.jmespath_options))

        if self.is_missing_idempotency_key(data=data):
            if self.raise_on_no_idempotency_key:
                raise IdempotencyKeyError("No data found to create a hashed idempotency_key")
            warnings.warn(f"No value found for idempotency_key. jmespath: {self.event_key_jmespath}")

        generated_hash = self._generate_hash(data=data)
        return f"{self.function_name}#{generated_hash}"

    @staticmethod
    def is_missing_idempotency_key(data) -> bool:
        if type(data).__name__ in ("tuple", "list", "dict"):
            return all(x is None for x in data)
        return not data

    def _get_hashed_payload(self, data: Dict[str, Any]) -> str:
        """
        Extract payload using validation key jmespath and return a hashed representation

        Parameters
        ----------
        data: Dict[str, Any]
            Payload

        Returns
        -------
        str
            Hashed representation of the data extracted by the jmespath expression

        """
        if not self.payload_validation_enabled:
            return ""
        data = self.validation_key_jmespath.search(data)
        return self._generate_hash(data=data)

    def _generate_hash(self, data: Any) -> str:
        """
        Generate a hash value from the provided data

        Parameters
        ----------
        data: Any
            The data to hash

        Returns
        -------
        str
            Hashed representation of the provided data

        """
        hashed_data = self.hash_function(json.dumps(data, cls=Encoder, sort_keys=True).encode())
        return hashed_data.hexdigest()

    def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> None:
        """
        Validate that the hashed payload matches data provided and stored data record

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        data_record: DataRecord
            DataRecord instance

        Raises
        ----------
        IdempotencyValidationError
            Payload doesn't match the stored record for the given idempotency key

        """
        if self.payload_validation_enabled:
            data_hash = self._get_hashed_payload(data=data)
            if data_record.payload_hash != data_hash:
                raise IdempotencyValidationError("Payload does not match stored record for this event key")

    def _get_expiry_timestamp(self) -> int:
        """

        Returns
        -------
        int
            unix timestamp of expiry date for idempotency record

        """
        now = datetime.datetime.now()
        period = datetime.timedelta(seconds=self.expires_after_seconds)
        return int((now + period).timestamp())

    def _save_to_cache(self, data_record: DataRecord):
        """
        Save data_record to local cache except when status is "INPROGRESS"

        NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the
        execution environment

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance

        Returns
        -------

        """
        if not self.use_local_cache:
            return
        if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
            return
        self._cache[data_record.idempotency_key] = data_record

    def _retrieve_from_cache(self, idempotency_key: str):
        if not self.use_local_cache:
            return
        cached_record = self._cache.get(key=idempotency_key)
        if cached_record:
            if not cached_record.is_expired:
                return cached_record
            logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}")
            self._delete_from_cache(idempotency_key=idempotency_key)

    def _delete_from_cache(self, idempotency_key: str):
        if not self.use_local_cache:
            return
        if idempotency_key in self._cache:
            del self._cache[idempotency_key]

    def save_success(self, data: Dict[str, Any], result: dict) -> None:
        """
        Save record of function's execution completing successfully

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        result: dict
            The response from function
        """
        response_data = json.dumps(result, cls=Encoder, sort_keys=True)

        data_record = DataRecord(
            idempotency_key=self._get_hashed_idempotency_key(data=data),
            status=STATUS_CONSTANTS["COMPLETED"],
            expiry_timestamp=self._get_expiry_timestamp(),
            response_data=response_data,
            payload_hash=self._get_hashed_payload(data=data),
        )
        logger.debug(
            f"Function successfully executed. Saving record to persistence store with "
            f"idempotency key: {data_record.idempotency_key}"
        )
        self._update_record(data_record=data_record)

        self._save_to_cache(data_record=data_record)

    def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
        """
        Save record of function's execution being in progress

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        remaining_time_in_millis: Optional[int]
            If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
        """
        data_record = DataRecord(
            idempotency_key=self._get_hashed_idempotency_key(data=data),
            status=STATUS_CONSTANTS["INPROGRESS"],
            expiry_timestamp=self._get_expiry_timestamp(),
            payload_hash=self._get_hashed_payload(data=data),
        )

        if remaining_time_in_millis:
            now = datetime.datetime.now()
            period = datetime.timedelta(milliseconds=remaining_time_in_millis)
            timestamp = (now + period).timestamp()

            data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
        else:
            warnings.warn(
                "Couldn't determine the remaining time left. "
                "Did you call register_lambda_context on IdempotencyConfig?"
            )

        logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

        if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
            raise IdempotencyItemAlreadyExistsError

        self._put_record(data_record=data_record)

    def delete_record(self, data: Dict[str, Any], exception: Exception):
        """
        Delete record from the persistence store

        Parameters
        ----------
        data: Dict[str, Any]
            Payload
        exception
            The exception raised by the function
        """
        data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(data=data))

        logger.debug(
            f"Function raised an exception ({type(exception).__name__}). Clearing in progress record in persistence "
            f"store for idempotency key: {data_record.idempotency_key}"
        )
        self._delete_record(data_record=data_record)

        self._delete_from_cache(idempotency_key=data_record.idempotency_key)

    def get_record(self, data: Dict[str, Any]) -> DataRecord:
        """
        Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

        Parameters
        ----------
        data: Dict[str, Any]
            Payload

        Returns
        -------
        DataRecord
            DataRecord representation of existing record found in persistence store

        Raises
        ------
        IdempotencyItemNotFoundError
            Exception raised if no record exists in persistence store with the idempotency key
        IdempotencyValidationError
            Payload doesn't match the stored record for the given idempotency key
        """

        idempotency_key = self._get_hashed_idempotency_key(data=data)

        cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
        if cached_record:
            logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
            self._validate_payload(data=data, data_record=cached_record)
            return cached_record

        record = self._get_record(idempotency_key=idempotency_key)

        self._save_to_cache(data_record=record)

        self._validate_payload(data=data, data_record=record)
        return record

    @abstractmethod
    def _get_record(self, idempotency_key) -> DataRecord:
        """
        Retrieve item from persistence store using idempotency key and return it as a DataRecord instance.

        Parameters
        ----------
        idempotency_key

        Returns
        -------
        DataRecord
            DataRecord representation of existing record found in persistence store

        Raises
        ------
        IdempotencyItemNotFoundError
            Exception raised if no record exists in persistence store with the idempotency key
        """
        raise NotImplementedError

    @abstractmethod
    def _put_record(self, data_record: DataRecord) -> None:
        """
        Add a DataRecord to persistence store if it does not already exist with that key. Raise ItemAlreadyExists
        if a non-expired entry already exists.

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

    @abstractmethod
    def _update_record(self, data_record: DataRecord) -> None:
        """
        Update item in persistence store

        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

    @abstractmethod
    def _delete_record(self, data_record: DataRecord) -> None:
        """
        Remove item from persistence store
        Parameters
        ----------
        data_record: DataRecord
            DataRecord instance
        """

        raise NotImplementedError

Ancestors

  • abc.ABC

Subclasses

Static methods

def is_missing_idempotency_key(data) ‑> bool
Expand source code
@staticmethod
def is_missing_idempotency_key(data) -> bool:
    if type(data).__name__ in ("tuple", "list", "dict"):
        return all(x is None for x in data)
    return not data

Methods

def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) ‑> None

Initialize the base persistence layer from the configuration settings

Parameters

config : IdempotencyConfig
Idempotency configuration settings
function_name : str, Optional
The name of the function being decorated
Expand source code
def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) -> None:
    """
    Initialize the base persistence layer from the configuration settings

    Parameters
    ----------
    config: IdempotencyConfig
        Idempotency configuration settings
    function_name: str, Optional
        The name of the function being decorated
    """
    self.function_name = f"{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, 'test-func')}.{function_name or ''}"

    if self.configured:
        # Prevent being reconfigured multiple times
        return
    self.configured = True

    self.event_key_jmespath = config.event_key_jmespath
    if config.event_key_jmespath:
        self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath)
    self.jmespath_options = config.jmespath_options
    if not self.jmespath_options:
        self.jmespath_options = {"custom_functions": PowertoolsFunctions()}
    if config.payload_validation_jmespath:
        self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath)
        self.payload_validation_enabled = True
    self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
    self.expires_after_seconds = config.expires_after_seconds
    self.use_local_cache = config.use_local_cache
    if self.use_local_cache:
        self._cache = LRUDict(max_items=config.local_cache_max_items)
    self.hash_function = getattr(hashlib, config.hash_function)
def delete_record(self, data: Dict[str, Any], exception: Exception)

Delete record from the persistence store

Parameters

data : Dict[str, Any]
Payload
exception
The exception raised by the function
Expand source code
def delete_record(self, data: Dict[str, Any], exception: Exception):
    """
    Delete record from the persistence store

    Parameters
    ----------
    data: Dict[str, Any]
        Payload
    exception
        The exception raised by the function
    """
    data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(data=data))

    logger.debug(
        f"Function raised an exception ({type(exception).__name__}). Clearing in progress record in persistence "
        f"store for idempotency key: {data_record.idempotency_key}"
    )
    self._delete_record(data_record=data_record)

    self._delete_from_cache(idempotency_key=data_record.idempotency_key)
def get_record(self, data: Dict[str, Any]) ‑> DataRecord

Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

Parameters

data : Dict[str, Any]
Payload

Returns

DataRecord
DataRecord representation of existing record found in persistence store

Raises

IdempotencyItemNotFoundError
Exception raised if no record exists in persistence store with the idempotency key
IdempotencyValidationError
Payload doesn't match the stored record for the given idempotency key
Expand source code
def get_record(self, data: Dict[str, Any]) -> DataRecord:
    """
    Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

    Parameters
    ----------
    data: Dict[str, Any]
        Payload

    Returns
    -------
    DataRecord
        DataRecord representation of existing record found in persistence store

    Raises
    ------
    IdempotencyItemNotFoundError
        Exception raised if no record exists in persistence store with the idempotency key
    IdempotencyValidationError
        Payload doesn't match the stored record for the given idempotency key
    """

    idempotency_key = self._get_hashed_idempotency_key(data=data)

    cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
    if cached_record:
        logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
        self._validate_payload(data=data, data_record=cached_record)
        return cached_record

    record = self._get_record(idempotency_key=idempotency_key)

    self._save_to_cache(data_record=record)

    self._validate_payload(data=data, data_record=record)
    return record
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) ‑> None

Save record of function's execution being in progress

Parameters

data : Dict[str, Any]
Payload
remaining_time_in_millis : Optional[int]
If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
Expand source code
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
    """
    Save record of function's execution being in progress

    Parameters
    ----------
    data: Dict[str, Any]
        Payload
    remaining_time_in_millis: Optional[int]
        If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
    """
    data_record = DataRecord(
        idempotency_key=self._get_hashed_idempotency_key(data=data),
        status=STATUS_CONSTANTS["INPROGRESS"],
        expiry_timestamp=self._get_expiry_timestamp(),
        payload_hash=self._get_hashed_payload(data=data),
    )

    if remaining_time_in_millis:
        now = datetime.datetime.now()
        period = datetime.timedelta(milliseconds=remaining_time_in_millis)
        timestamp = (now + period).timestamp()

        data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
    else:
        warnings.warn(
            "Couldn't determine the remaining time left. "
            "Did you call register_lambda_context on IdempotencyConfig?"
        )

    logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

    if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
        raise IdempotencyItemAlreadyExistsError

    self._put_record(data_record=data_record)
def save_success(self, data: Dict[str, Any], result: dict) ‑> None

Save record of function's execution completing successfully

Parameters

data : Dict[str, Any]
Payload
result : dict
The response from function
Expand source code
def save_success(self, data: Dict[str, Any], result: dict) -> None:
    """
    Save record of function's execution completing successfully

    Parameters
    ----------
    data: Dict[str, Any]
        Payload
    result: dict
        The response from function
    """
    response_data = json.dumps(result, cls=Encoder, sort_keys=True)

    data_record = DataRecord(
        idempotency_key=self._get_hashed_idempotency_key(data=data),
        status=STATUS_CONSTANTS["COMPLETED"],
        expiry_timestamp=self._get_expiry_timestamp(),
        response_data=response_data,
        payload_hash=self._get_hashed_payload(data=data),
    )
    logger.debug(
        f"Function successfully executed. Saving record to persistence store with "
        f"idempotency key: {data_record.idempotency_key}"
    )
    self._update_record(data_record=data_record)

    self._save_to_cache(data_record=data_record)
class DataRecord (idempotency_key: str, status: str = '', expiry_timestamp: Optional[int] = None, in_progress_expiry_timestamp: Optional[int] = None, response_data: Optional[str] = '', payload_hash: Optional[str] = None)

Data Class for idempotency records.

Parameters

idempotency_key : str
hashed representation of the idempotent data
status : str, optional
status of the idempotent record
expiry_timestamp : int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp : int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash : str, optional
hashed representation of payload
response_data : str, optional
response data from previous executions using the record
Expand source code
class DataRecord:
    """
    Data Class for idempotency records.
    """

    def __init__(
        self,
        idempotency_key: str,
        status: str = "",
        expiry_timestamp: Optional[int] = None,
        in_progress_expiry_timestamp: Optional[int] = None,
        response_data: Optional[str] = "",
        payload_hash: Optional[str] = None,
    ) -> None:
        """

        Parameters
        ----------
        idempotency_key: str
            hashed representation of the idempotent data
        status: str, optional
            status of the idempotent record
        expiry_timestamp: int, optional
            time before the record should expire, in seconds
        in_progress_expiry_timestamp: int, optional
            time before the record should expire while in the INPROGRESS state, in seconds
        payload_hash: str, optional
            hashed representation of payload
        response_data: str, optional
            response data from previous executions using the record
        """
        self.idempotency_key = idempotency_key
        self.payload_hash = payload_hash
        self.expiry_timestamp = expiry_timestamp
        self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
        self._status = status
        self.response_data = response_data

    @property
    def is_expired(self) -> bool:
        """
        Check if data record is expired

        Returns
        -------
        bool
            Whether the record is currently expired or not
        """
        return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)

    @property
    def status(self) -> str:
        """
        Get status of data record

        Returns
        -------
        str
        """
        if self.is_expired:
            return STATUS_CONSTANTS["EXPIRED"]
        elif self._status in STATUS_CONSTANTS.values():
            return self._status
        else:
            raise IdempotencyInvalidStatusError(self._status)

    def response_json_as_dict(self) -> Optional[dict]:
        """
        Get response data deserialized to python dict

        Returns
        -------
        Optional[dict]
            previous response data deserialized
        """
        return json.loads(self.response_data) if self.response_data else None

Instance variables

var is_expired : bool

Check if data record is expired

Returns

bool
Whether the record is currently expired or not
Expand source code
@property
def is_expired(self) -> bool:
    """
    Check if data record is expired

    Returns
    -------
    bool
        Whether the record is currently expired or not
    """
    return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)
var status : str

Get status of data record

Returns

str
 
Expand source code
@property
def status(self) -> str:
    """
    Get status of data record

    Returns
    -------
    str
    """
    if self.is_expired:
        return STATUS_CONSTANTS["EXPIRED"]
    elif self._status in STATUS_CONSTANTS.values():
        return self._status
    else:
        raise IdempotencyInvalidStatusError(self._status)

Methods

def response_json_as_dict(self) ‑> Optional[dict]

Get response data deserialized to python dict

Returns

Optional[dict]
previous response data deserialized
Expand source code
def response_json_as_dict(self) -> Optional[dict]:
    """
    Get response data deserialized to python dict

    Returns
    -------
    Optional[dict]
        previous response data deserialized
    """
    return json.loads(self.response_data) if self.response_data else None