Module aws_lambda_powertools.utilities.idempotency.persistence.base
Persistence layers supporting idempotency
Expand source code
"""
Persistence layers supporting idempotency
"""
import datetime
import hashlib
import json
import logging
import warnings
from abc import ABC, abstractmethod
from types import MappingProxyType
from typing import Any, Dict, Optional
import jmespath
from aws_lambda_powertools.shared.cache_dict import LRUDict
from aws_lambda_powertools.shared.jmespath_functions import PowertoolsFunctions
from aws_lambda_powertools.shared.json_encoder import Encoder
from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyInvalidStatusError,
IdempotencyItemAlreadyExistsError,
IdempotencyKeyError,
IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = logging.getLogger(__name__)
STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})
class DataRecord:
"""
Data Class for idempotency records.
"""
def __init__(
self,
idempotency_key,
status: str = "",
expiry_timestamp: int = None,
response_data: Optional[str] = "",
payload_hash: str = None,
) -> None:
"""
Parameters
----------
idempotency_key: str
hashed representation of the idempotent data
status: str, optional
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
response data from previous executions using the record
"""
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self._status = status
self.response_data = response_data
@property
def is_expired(self) -> bool:
"""
Check if data record is expired
Returns
-------
bool
Whether the record is currently expired or not
"""
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)
@property
def status(self) -> str:
"""
Get status of data record
Returns
-------
str
"""
if self.is_expired:
return STATUS_CONSTANTS["EXPIRED"]
elif self._status in STATUS_CONSTANTS.values():
return self._status
else:
raise IdempotencyInvalidStatusError(self._status)
def response_json_as_dict(self) -> dict:
"""
Get response data deserialized to python dict
Returns
-------
dict
previous response data deserialized
"""
return json.loads(self.response_data)
class BasePersistenceLayer(ABC):
"""
Abstract Base Class for Idempotency persistence layer.
"""
def __init__(self):
"""Initialize the defaults """
self.configured = False
self.event_key_jmespath: Optional[str] = None
self.event_key_compiled_jmespath = None
self.jmespath_options: Optional[dict] = None
self.payload_validation_enabled = False
self.validation_key_jmespath = None
self.raise_on_no_idempotency_key = False
self.expires_after_seconds: int = 60 * 60 # 1 hour default
self.use_local_cache = False
self._cache: Optional[LRUDict] = None
self.hash_function = None
def configure(self, config: IdempotencyConfig) -> None:
"""
Initialize the base persistence layer from the configuration settings
Parameters
----------
config: IdempotencyConfig
Idempotency configuration settings
"""
if self.configured:
# Prevent being reconfigured multiple times
return
self.configured = True
self.event_key_jmespath = config.event_key_jmespath
if config.event_key_jmespath:
self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath)
self.jmespath_options = config.jmespath_options
if not self.jmespath_options:
self.jmespath_options = {"custom_functions": PowertoolsFunctions()}
if config.payload_validation_jmespath:
self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath)
self.payload_validation_enabled = True
self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
self.expires_after_seconds = config.expires_after_seconds
self.use_local_cache = config.use_local_cache
if self.use_local_cache:
self._cache = LRUDict(max_items=config.local_cache_max_items)
self.hash_function = getattr(hashlib, config.hash_function)
def _get_hashed_idempotency_key(self, event: Dict[str, Any], context: LambdaContext) -> str:
"""
Extract data from lambda event using event key jmespath, and return a hashed representation
Parameters
----------
event: Dict[str, Any]
Lambda event
context: LambdaContext
Lambda context
Returns
-------
str
Hashed representation of the data extracted by the jmespath expression
"""
data = event
if self.event_key_jmespath:
data = self.event_key_compiled_jmespath.search(event, options=jmespath.Options(**self.jmespath_options))
if self.is_missing_idempotency_key(data):
if self.raise_on_no_idempotency_key:
raise IdempotencyKeyError("No data found to create a hashed idempotency_key")
warnings.warn(f"No value found for idempotency_key. jmespath: {self.event_key_jmespath}")
generated_hash = self._generate_hash(data)
return f"{context.function_name}#{generated_hash}"
@staticmethod
def is_missing_idempotency_key(data) -> bool:
if type(data).__name__ in ("tuple", "list", "dict"):
return all(x is None for x in data)
return not data
def _get_hashed_payload(self, lambda_event: Dict[str, Any]) -> str:
"""
Extract data from lambda event using validation key jmespath, and return a hashed representation
Parameters
----------
lambda_event: Dict[str, Any]
Lambda event
Returns
-------
str
Hashed representation of the data extracted by the jmespath expression
"""
if not self.payload_validation_enabled:
return ""
data = self.validation_key_jmespath.search(lambda_event)
return self._generate_hash(data)
def _generate_hash(self, data: Any) -> str:
"""
Generate a hash value from the provided data
Parameters
----------
data: Any
The data to hash
Returns
-------
str
Hashed representation of the provided data
"""
hashed_data = self.hash_function(json.dumps(data, cls=Encoder).encode())
return hashed_data.hexdigest()
def _validate_payload(self, lambda_event: Dict[str, Any], data_record: DataRecord) -> None:
"""
Validate that the hashed payload matches in the lambda event and stored data record
Parameters
----------
lambda_event: Dict[str, Any]
Lambda event
data_record: DataRecord
DataRecord instance
Raises
----------
IdempotencyValidationError
Event payload doesn't match the stored record for the given idempotency key
"""
if self.payload_validation_enabled:
lambda_payload_hash = self._get_hashed_payload(lambda_event)
if data_record.payload_hash != lambda_payload_hash:
raise IdempotencyValidationError("Payload does not match stored record for this event key")
def _get_expiry_timestamp(self) -> int:
"""
Returns
-------
int
unix timestamp of expiry date for idempotency record
"""
now = datetime.datetime.now()
period = datetime.timedelta(seconds=self.expires_after_seconds)
return int((now + period).timestamp())
def _save_to_cache(self, data_record: DataRecord):
"""
Save data_record to local cache except when status is "INPROGRESS"
NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the
execution environment
Parameters
----------
data_record: DataRecord
DataRecord instance
Returns
-------
"""
if not self.use_local_cache:
return
if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
return
self._cache[data_record.idempotency_key] = data_record
def _retrieve_from_cache(self, idempotency_key: str):
if not self.use_local_cache:
return
cached_record = self._cache.get(idempotency_key)
if cached_record:
if not cached_record.is_expired:
return cached_record
logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}")
self._delete_from_cache(idempotency_key)
def _delete_from_cache(self, idempotency_key: str):
if not self.use_local_cache:
return
if idempotency_key in self._cache:
del self._cache[idempotency_key]
def save_success(self, event: Dict[str, Any], context: LambdaContext, result: dict) -> None:
"""
Save record of function's execution completing successfully
Parameters
----------
event: Dict[str, Any]
Lambda event
context: LambdaContext
Lambda context
result: dict
The response from lambda handler
"""
response_data = json.dumps(result, cls=Encoder)
data_record = DataRecord(
idempotency_key=self._get_hashed_idempotency_key(event, context),
status=STATUS_CONSTANTS["COMPLETED"],
expiry_timestamp=self._get_expiry_timestamp(),
response_data=response_data,
payload_hash=self._get_hashed_payload(event),
)
logger.debug(
f"Lambda successfully executed. Saving record to persistence store with "
f"idempotency key: {data_record.idempotency_key}"
)
self._update_record(data_record=data_record)
self._save_to_cache(data_record)
def save_inprogress(self, event: Dict[str, Any], context: LambdaContext) -> None:
"""
Save record of function's execution being in progress
Parameters
----------
event: Dict[str, Any]
Lambda event
context: LambdaContext
Lambda context
"""
data_record = DataRecord(
idempotency_key=self._get_hashed_idempotency_key(event, context),
status=STATUS_CONSTANTS["INPROGRESS"],
expiry_timestamp=self._get_expiry_timestamp(),
payload_hash=self._get_hashed_payload(event),
)
logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")
if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
raise IdempotencyItemAlreadyExistsError
self._put_record(data_record)
def delete_record(self, event: Dict[str, Any], context: LambdaContext, exception: Exception):
"""
Delete record from the persistence store
Parameters
----------
event: Dict[str, Any]
Lambda event
context: LambdaContext
Lambda context
exception
The exception raised by the lambda handler
"""
data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(event, context))
logger.debug(
f"Lambda raised an exception ({type(exception).__name__}). Clearing in progress record in persistence "
f"store for idempotency key: {data_record.idempotency_key}"
)
self._delete_record(data_record)
self._delete_from_cache(data_record.idempotency_key)
def get_record(self, event: Dict[str, Any], context: LambdaContext) -> DataRecord:
"""
Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key
and return it as a DataRecord instance.and return it as a DataRecord instance.
Parameters
----------
event: Dict[str, Any]
Lambda event
context: LambdaContext
Lambda context
Returns
-------
DataRecord
DataRecord representation of existing record found in persistence store
Raises
------
IdempotencyItemNotFoundError
Exception raised if no record exists in persistence store with the idempotency key
IdempotencyValidationError
Event payload doesn't match the stored record for the given idempotency key
"""
idempotency_key = self._get_hashed_idempotency_key(event, context)
cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
if cached_record:
logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
self._validate_payload(event, cached_record)
return cached_record
record = self._get_record(idempotency_key)
self._save_to_cache(data_record=record)
self._validate_payload(event, record)
return record
@abstractmethod
def _get_record(self, idempotency_key) -> DataRecord:
"""
Retrieve item from persistence store using idempotency key and return it as a DataRecord instance.
Parameters
----------
idempotency_key
Returns
-------
DataRecord
DataRecord representation of existing record found in persistence store
Raises
------
IdempotencyItemNotFoundError
Exception raised if no record exists in persistence store with the idempotency key
"""
raise NotImplementedError
@abstractmethod
def _put_record(self, data_record: DataRecord) -> None:
"""
Add a DataRecord to persistence store if it does not already exist with that key. Raise ItemAlreadyExists
if a non-expired entry already exists.
Parameters
----------
data_record: DataRecord
DataRecord instance
"""
raise NotImplementedError
@abstractmethod
def _update_record(self, data_record: DataRecord) -> None:
"""
Update item in persistence store
Parameters
----------
data_record: DataRecord
DataRecord instance
"""
raise NotImplementedError
@abstractmethod
def _delete_record(self, data_record: DataRecord) -> None:
"""
Remove item from persistence store
Parameters
----------
data_record: DataRecord
DataRecord instance
"""
raise NotImplementedError
Classes
class BasePersistenceLayer
-
Abstract Base Class for Idempotency persistence layer.
Initialize the defaults
Expand source code
class BasePersistenceLayer(ABC): """ Abstract Base Class for Idempotency persistence layer. """ def __init__(self): """Initialize the defaults """ self.configured = False self.event_key_jmespath: Optional[str] = None self.event_key_compiled_jmespath = None self.jmespath_options: Optional[dict] = None self.payload_validation_enabled = False self.validation_key_jmespath = None self.raise_on_no_idempotency_key = False self.expires_after_seconds: int = 60 * 60 # 1 hour default self.use_local_cache = False self._cache: Optional[LRUDict] = None self.hash_function = None def configure(self, config: IdempotencyConfig) -> None: """ Initialize the base persistence layer from the configuration settings Parameters ---------- config: IdempotencyConfig Idempotency configuration settings """ if self.configured: # Prevent being reconfigured multiple times return self.configured = True self.event_key_jmespath = config.event_key_jmespath if config.event_key_jmespath: self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath) self.jmespath_options = config.jmespath_options if not self.jmespath_options: self.jmespath_options = {"custom_functions": PowertoolsFunctions()} if config.payload_validation_jmespath: self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath) self.payload_validation_enabled = True self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key self.expires_after_seconds = config.expires_after_seconds self.use_local_cache = config.use_local_cache if self.use_local_cache: self._cache = LRUDict(max_items=config.local_cache_max_items) self.hash_function = getattr(hashlib, config.hash_function) def _get_hashed_idempotency_key(self, event: Dict[str, Any], context: LambdaContext) -> str: """ Extract data from lambda event using event key jmespath, and return a hashed representation Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context Returns ------- str Hashed representation of the data extracted by the jmespath expression """ data = event if self.event_key_jmespath: data = self.event_key_compiled_jmespath.search(event, options=jmespath.Options(**self.jmespath_options)) if self.is_missing_idempotency_key(data): if self.raise_on_no_idempotency_key: raise IdempotencyKeyError("No data found to create a hashed idempotency_key") warnings.warn(f"No value found for idempotency_key. jmespath: {self.event_key_jmespath}") generated_hash = self._generate_hash(data) return f"{context.function_name}#{generated_hash}" @staticmethod def is_missing_idempotency_key(data) -> bool: if type(data).__name__ in ("tuple", "list", "dict"): return all(x is None for x in data) return not data def _get_hashed_payload(self, lambda_event: Dict[str, Any]) -> str: """ Extract data from lambda event using validation key jmespath, and return a hashed representation Parameters ---------- lambda_event: Dict[str, Any] Lambda event Returns ------- str Hashed representation of the data extracted by the jmespath expression """ if not self.payload_validation_enabled: return "" data = self.validation_key_jmespath.search(lambda_event) return self._generate_hash(data) def _generate_hash(self, data: Any) -> str: """ Generate a hash value from the provided data Parameters ---------- data: Any The data to hash Returns ------- str Hashed representation of the provided data """ hashed_data = self.hash_function(json.dumps(data, cls=Encoder).encode()) return hashed_data.hexdigest() def _validate_payload(self, lambda_event: Dict[str, Any], data_record: DataRecord) -> None: """ Validate that the hashed payload matches in the lambda event and stored data record Parameters ---------- lambda_event: Dict[str, Any] Lambda event data_record: DataRecord DataRecord instance Raises ---------- IdempotencyValidationError Event payload doesn't match the stored record for the given idempotency key """ if self.payload_validation_enabled: lambda_payload_hash = self._get_hashed_payload(lambda_event) if data_record.payload_hash != lambda_payload_hash: raise IdempotencyValidationError("Payload does not match stored record for this event key") def _get_expiry_timestamp(self) -> int: """ Returns ------- int unix timestamp of expiry date for idempotency record """ now = datetime.datetime.now() period = datetime.timedelta(seconds=self.expires_after_seconds) return int((now + period).timestamp()) def _save_to_cache(self, data_record: DataRecord): """ Save data_record to local cache except when status is "INPROGRESS" NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the execution environment Parameters ---------- data_record: DataRecord DataRecord instance Returns ------- """ if not self.use_local_cache: return if data_record.status == STATUS_CONSTANTS["INPROGRESS"]: return self._cache[data_record.idempotency_key] = data_record def _retrieve_from_cache(self, idempotency_key: str): if not self.use_local_cache: return cached_record = self._cache.get(idempotency_key) if cached_record: if not cached_record.is_expired: return cached_record logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}") self._delete_from_cache(idempotency_key) def _delete_from_cache(self, idempotency_key: str): if not self.use_local_cache: return if idempotency_key in self._cache: del self._cache[idempotency_key] def save_success(self, event: Dict[str, Any], context: LambdaContext, result: dict) -> None: """ Save record of function's execution completing successfully Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context result: dict The response from lambda handler """ response_data = json.dumps(result, cls=Encoder) data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(event, context), status=STATUS_CONSTANTS["COMPLETED"], expiry_timestamp=self._get_expiry_timestamp(), response_data=response_data, payload_hash=self._get_hashed_payload(event), ) logger.debug( f"Lambda successfully executed. Saving record to persistence store with " f"idempotency key: {data_record.idempotency_key}" ) self._update_record(data_record=data_record) self._save_to_cache(data_record) def save_inprogress(self, event: Dict[str, Any], context: LambdaContext) -> None: """ Save record of function's execution being in progress Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context """ data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(event, context), status=STATUS_CONSTANTS["INPROGRESS"], expiry_timestamp=self._get_expiry_timestamp(), payload_hash=self._get_hashed_payload(event), ) logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}") if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key): raise IdempotencyItemAlreadyExistsError self._put_record(data_record) def delete_record(self, event: Dict[str, Any], context: LambdaContext, exception: Exception): """ Delete record from the persistence store Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context exception The exception raised by the lambda handler """ data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(event, context)) logger.debug( f"Lambda raised an exception ({type(exception).__name__}). Clearing in progress record in persistence " f"store for idempotency key: {data_record.idempotency_key}" ) self._delete_record(data_record) self._delete_from_cache(data_record.idempotency_key) def get_record(self, event: Dict[str, Any], context: LambdaContext) -> DataRecord: """ Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key and return it as a DataRecord instance.and return it as a DataRecord instance. Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context Returns ------- DataRecord DataRecord representation of existing record found in persistence store Raises ------ IdempotencyItemNotFoundError Exception raised if no record exists in persistence store with the idempotency key IdempotencyValidationError Event payload doesn't match the stored record for the given idempotency key """ idempotency_key = self._get_hashed_idempotency_key(event, context) cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key) if cached_record: logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}") self._validate_payload(event, cached_record) return cached_record record = self._get_record(idempotency_key) self._save_to_cache(data_record=record) self._validate_payload(event, record) return record @abstractmethod def _get_record(self, idempotency_key) -> DataRecord: """ Retrieve item from persistence store using idempotency key and return it as a DataRecord instance. Parameters ---------- idempotency_key Returns ------- DataRecord DataRecord representation of existing record found in persistence store Raises ------ IdempotencyItemNotFoundError Exception raised if no record exists in persistence store with the idempotency key """ raise NotImplementedError @abstractmethod def _put_record(self, data_record: DataRecord) -> None: """ Add a DataRecord to persistence store if it does not already exist with that key. Raise ItemAlreadyExists if a non-expired entry already exists. Parameters ---------- data_record: DataRecord DataRecord instance """ raise NotImplementedError @abstractmethod def _update_record(self, data_record: DataRecord) -> None: """ Update item in persistence store Parameters ---------- data_record: DataRecord DataRecord instance """ raise NotImplementedError @abstractmethod def _delete_record(self, data_record: DataRecord) -> None: """ Remove item from persistence store Parameters ---------- data_record: DataRecord DataRecord instance """ raise NotImplementedError
Ancestors
- abc.ABC
Subclasses
Static methods
def is_missing_idempotency_key(data) ‑> bool
-
Expand source code
@staticmethod def is_missing_idempotency_key(data) -> bool: if type(data).__name__ in ("tuple", "list", "dict"): return all(x is None for x in data) return not data
Methods
def configure(self, config: IdempotencyConfig) ‑> NoneType
-
Initialize the base persistence layer from the configuration settings
Parameters
config
:IdempotencyConfig
- Idempotency configuration settings
Expand source code
def configure(self, config: IdempotencyConfig) -> None: """ Initialize the base persistence layer from the configuration settings Parameters ---------- config: IdempotencyConfig Idempotency configuration settings """ if self.configured: # Prevent being reconfigured multiple times return self.configured = True self.event_key_jmespath = config.event_key_jmespath if config.event_key_jmespath: self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath) self.jmespath_options = config.jmespath_options if not self.jmespath_options: self.jmespath_options = {"custom_functions": PowertoolsFunctions()} if config.payload_validation_jmespath: self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath) self.payload_validation_enabled = True self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key self.expires_after_seconds = config.expires_after_seconds self.use_local_cache = config.use_local_cache if self.use_local_cache: self._cache = LRUDict(max_items=config.local_cache_max_items) self.hash_function = getattr(hashlib, config.hash_function)
def delete_record(self, event: Dict[str, Any], context: LambdaContext, exception: Exception)
-
Delete record from the persistence store
Parameters
event
:Dict[str, Any]
- Lambda event
context
:LambdaContext
- Lambda context
exception
- The exception raised by the lambda handler
Expand source code
def delete_record(self, event: Dict[str, Any], context: LambdaContext, exception: Exception): """ Delete record from the persistence store Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context exception The exception raised by the lambda handler """ data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(event, context)) logger.debug( f"Lambda raised an exception ({type(exception).__name__}). Clearing in progress record in persistence " f"store for idempotency key: {data_record.idempotency_key}" ) self._delete_record(data_record) self._delete_from_cache(data_record.idempotency_key)
def get_record(self, event: Dict[str, Any], context: LambdaContext) ‑> DataRecord
-
Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key and return it as a DataRecord instance.and return it as a DataRecord instance.
Parameters
event
:Dict[str, Any]
- Lambda event
context
:LambdaContext
- Lambda context
Returns
DataRecord
- DataRecord representation of existing record found in persistence store
Raises
IdempotencyItemNotFoundError
- Exception raised if no record exists in persistence store with the idempotency key
IdempotencyValidationError
- Event payload doesn't match the stored record for the given idempotency key
Expand source code
def get_record(self, event: Dict[str, Any], context: LambdaContext) -> DataRecord: """ Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key and return it as a DataRecord instance.and return it as a DataRecord instance. Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context Returns ------- DataRecord DataRecord representation of existing record found in persistence store Raises ------ IdempotencyItemNotFoundError Exception raised if no record exists in persistence store with the idempotency key IdempotencyValidationError Event payload doesn't match the stored record for the given idempotency key """ idempotency_key = self._get_hashed_idempotency_key(event, context) cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key) if cached_record: logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}") self._validate_payload(event, cached_record) return cached_record record = self._get_record(idempotency_key) self._save_to_cache(data_record=record) self._validate_payload(event, record) return record
def save_inprogress(self, event: Dict[str, Any], context: LambdaContext) ‑> NoneType
-
Save record of function's execution being in progress
Parameters
event
:Dict[str, Any]
- Lambda event
context
:LambdaContext
- Lambda context
Expand source code
def save_inprogress(self, event: Dict[str, Any], context: LambdaContext) -> None: """ Save record of function's execution being in progress Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context """ data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(event, context), status=STATUS_CONSTANTS["INPROGRESS"], expiry_timestamp=self._get_expiry_timestamp(), payload_hash=self._get_hashed_payload(event), ) logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}") if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key): raise IdempotencyItemAlreadyExistsError self._put_record(data_record)
def save_success(self, event: Dict[str, Any], context: LambdaContext, result: dict) ‑> NoneType
-
Save record of function's execution completing successfully
Parameters
event
:Dict[str, Any]
- Lambda event
context
:LambdaContext
- Lambda context
result
:dict
- The response from lambda handler
Expand source code
def save_success(self, event: Dict[str, Any], context: LambdaContext, result: dict) -> None: """ Save record of function's execution completing successfully Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context result: dict The response from lambda handler """ response_data = json.dumps(result, cls=Encoder) data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(event, context), status=STATUS_CONSTANTS["COMPLETED"], expiry_timestamp=self._get_expiry_timestamp(), response_data=response_data, payload_hash=self._get_hashed_payload(event), ) logger.debug( f"Lambda successfully executed. Saving record to persistence store with " f"idempotency key: {data_record.idempotency_key}" ) self._update_record(data_record=data_record) self._save_to_cache(data_record)
class DataRecord (idempotency_key, status: str = '', expiry_timestamp: int = None, response_data: Union[str, NoneType] = '', payload_hash: str = None)
-
Data Class for idempotency records.
Parameters
idempotency_key
:str
- hashed representation of the idempotent data
status
:str
, optional- status of the idempotent record
expiry_timestamp
:int
, optional- time before the record should expire, in seconds
payload_hash
:str
, optional- hashed representation of payload
response_data
:str
, optional- response data from previous executions using the record
Expand source code
class DataRecord: """ Data Class for idempotency records. """ def __init__( self, idempotency_key, status: str = "", expiry_timestamp: int = None, response_data: Optional[str] = "", payload_hash: str = None, ) -> None: """ Parameters ---------- idempotency_key: str hashed representation of the idempotent data status: str, optional status of the idempotent record expiry_timestamp: int, optional time before the record should expire, in seconds payload_hash: str, optional hashed representation of payload response_data: str, optional response data from previous executions using the record """ self.idempotency_key = idempotency_key self.payload_hash = payload_hash self.expiry_timestamp = expiry_timestamp self._status = status self.response_data = response_data @property def is_expired(self) -> bool: """ Check if data record is expired Returns ------- bool Whether the record is currently expired or not """ return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp) @property def status(self) -> str: """ Get status of data record Returns ------- str """ if self.is_expired: return STATUS_CONSTANTS["EXPIRED"] elif self._status in STATUS_CONSTANTS.values(): return self._status else: raise IdempotencyInvalidStatusError(self._status) def response_json_as_dict(self) -> dict: """ Get response data deserialized to python dict Returns ------- dict previous response data deserialized """ return json.loads(self.response_data)
Instance variables
var is_expired : bool
-
Check if data record is expired
Returns
bool
- Whether the record is currently expired or not
Expand source code
@property def is_expired(self) -> bool: """ Check if data record is expired Returns ------- bool Whether the record is currently expired or not """ return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)
var status : str
-
Get status of data record
Returns
str
Expand source code
@property def status(self) -> str: """ Get status of data record Returns ------- str """ if self.is_expired: return STATUS_CONSTANTS["EXPIRED"] elif self._status in STATUS_CONSTANTS.values(): return self._status else: raise IdempotencyInvalidStatusError(self._status)
Methods
def response_json_as_dict(self) ‑> dict
-
Get response data deserialized to python dict
Returns
dict
- previous response data deserialized
Expand source code
def response_json_as_dict(self) -> dict: """ Get response data deserialized to python dict Returns ------- dict previous response data deserialized """ return json.loads(self.response_data)