Module aws_lambda_powertools.utilities.idempotency
Utility for adding idempotency to lambda functions
Expand source code
"""
Utility for adding idempotency to lambda functions
"""
from aws_lambda_powertools.utilities.idempotency.persistence.base import BasePersistenceLayer
from aws_lambda_powertools.utilities.idempotency.persistence.dynamodb import DynamoDBPersistenceLayer
from .idempotency import IdempotencyConfig, idempotent
__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent", "IdempotencyConfig")
Sub-modules
aws_lambda_powertools.utilities.idempotency.config
aws_lambda_powertools.utilities.idempotency.exceptions
-
Idempotency errors
aws_lambda_powertools.utilities.idempotency.idempotency
-
Primary interface for idempotent Lambda functions utility
aws_lambda_powertools.utilities.idempotency.persistence
Functions
def idempotent(handler: Callable[[Any, aws_lambda_powertools.utilities.lambda_context.LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, persistence_store: BasePersistenceLayer, config: IdempotencyConfig = None) ‑> Any
-
Middleware to handle idempotency
Parameters
handler
:Callable
- Lambda's handler
event
:Dict
- Lambda's Event
context
:Dict
- Lambda's Context
persistence_store
:BasePersistenceLayer
- Instance of BasePersistenceLayer to store data
config
:IdempotencyConfig
- Configuration
Examples
Processes Lambda's event in an idempotent manner
>>> from aws_lambda_powertools.utilities.idempotency import ( >>> idempotent, DynamoDBPersistenceLayer, IdempotencyConfig >>> ) >>> >>> idem_config=IdempotencyConfig(event_key_jmespath="body") >>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store") >>> >>> @idempotent(config=idem_config, persistence_store=persistence_layer) >>> def handler(event, context): >>> return {"StatusCode": 200}
Expand source code
@lambda_handler_decorator def idempotent( handler: Callable[[Any, LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, persistence_store: BasePersistenceLayer, config: IdempotencyConfig = None, ) -> Any: """ Middleware to handle idempotency Parameters ---------- handler: Callable Lambda's handler event: Dict Lambda's Event context: Dict Lambda's Context persistence_store: BasePersistenceLayer Instance of BasePersistenceLayer to store data config: IdempotencyConfig Configuration Examples -------- **Processes Lambda's event in an idempotent manner** >>> from aws_lambda_powertools.utilities.idempotency import ( >>> idempotent, DynamoDBPersistenceLayer, IdempotencyConfig >>> ) >>> >>> idem_config=IdempotencyConfig(event_key_jmespath="body") >>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store") >>> >>> @idempotent(config=idem_config, persistence_store=persistence_layer) >>> def handler(event, context): >>> return {"StatusCode": 200} """ config = config or IdempotencyConfig() idempotency_handler = IdempotencyHandler( lambda_handler=handler, event=event, context=context, persistence_store=persistence_store, config=config ) # IdempotencyInconsistentStateError can happen under rare but expected cases when persistent state changes in the # small time between put & get requests. In most cases we can retry successfully on this exception. # Maintenance: Allow customers to specify number of retries max_handler_retries = 2 for i in range(max_handler_retries + 1): try: return idempotency_handler.handle() except IdempotencyInconsistentStateError: if i < max_handler_retries: continue else: # Allow the exception to bubble up after max retries exceeded raise
Classes
class BasePersistenceLayer
-
Abstract Base Class for Idempotency persistence layer.
Initialize the defaults
Expand source code
class BasePersistenceLayer(ABC): """ Abstract Base Class for Idempotency persistence layer. """ def __init__(self): """Initialize the defaults """ self.configured = False self.event_key_jmespath: Optional[str] = None self.event_key_compiled_jmespath = None self.jmespath_options: Optional[dict] = None self.payload_validation_enabled = False self.validation_key_jmespath = None self.raise_on_no_idempotency_key = False self.expires_after_seconds: int = 60 * 60 # 1 hour default self.use_local_cache = False self._cache: Optional[LRUDict] = None self.hash_function = None def configure(self, config: IdempotencyConfig) -> None: """ Initialize the base persistence layer from the configuration settings Parameters ---------- config: IdempotencyConfig Idempotency configuration settings """ if self.configured: # Prevent being reconfigured multiple times return self.configured = True self.event_key_jmespath = config.event_key_jmespath if config.event_key_jmespath: self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath) self.jmespath_options = config.jmespath_options if not self.jmespath_options: self.jmespath_options = {"custom_functions": PowertoolsFunctions()} if config.payload_validation_jmespath: self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath) self.payload_validation_enabled = True self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key self.expires_after_seconds = config.expires_after_seconds self.use_local_cache = config.use_local_cache if self.use_local_cache: self._cache = LRUDict(max_items=config.local_cache_max_items) self.hash_function = getattr(hashlib, config.hash_function) def _get_hashed_idempotency_key(self, event: Dict[str, Any], context: LambdaContext) -> str: """ Extract data from lambda event using event key jmespath, and return a hashed representation Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context Returns ------- str Hashed representation of the data extracted by the jmespath expression """ data = event if self.event_key_jmespath: data = self.event_key_compiled_jmespath.search(event, options=jmespath.Options(**self.jmespath_options)) if self.is_missing_idempotency_key(data): if self.raise_on_no_idempotency_key: raise IdempotencyKeyError("No data found to create a hashed idempotency_key") warnings.warn(f"No value found for idempotency_key. jmespath: {self.event_key_jmespath}") generated_hash = self._generate_hash(data) return f"{context.function_name}#{generated_hash}" @staticmethod def is_missing_idempotency_key(data) -> bool: if type(data).__name__ in ("tuple", "list", "dict"): return all(x is None for x in data) return not data def _get_hashed_payload(self, lambda_event: Dict[str, Any]) -> str: """ Extract data from lambda event using validation key jmespath, and return a hashed representation Parameters ---------- lambda_event: Dict[str, Any] Lambda event Returns ------- str Hashed representation of the data extracted by the jmespath expression """ if not self.payload_validation_enabled: return "" data = self.validation_key_jmespath.search(lambda_event) return self._generate_hash(data) def _generate_hash(self, data: Any) -> str: """ Generate a hash value from the provided data Parameters ---------- data: Any The data to hash Returns ------- str Hashed representation of the provided data """ hashed_data = self.hash_function(json.dumps(data, cls=Encoder).encode()) return hashed_data.hexdigest() def _validate_payload(self, lambda_event: Dict[str, Any], data_record: DataRecord) -> None: """ Validate that the hashed payload matches in the lambda event and stored data record Parameters ---------- lambda_event: Dict[str, Any] Lambda event data_record: DataRecord DataRecord instance Raises ---------- IdempotencyValidationError Event payload doesn't match the stored record for the given idempotency key """ if self.payload_validation_enabled: lambda_payload_hash = self._get_hashed_payload(lambda_event) if data_record.payload_hash != lambda_payload_hash: raise IdempotencyValidationError("Payload does not match stored record for this event key") def _get_expiry_timestamp(self) -> int: """ Returns ------- int unix timestamp of expiry date for idempotency record """ now = datetime.datetime.now() period = datetime.timedelta(seconds=self.expires_after_seconds) return int((now + period).timestamp()) def _save_to_cache(self, data_record: DataRecord): """ Save data_record to local cache except when status is "INPROGRESS" NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the execution environment Parameters ---------- data_record: DataRecord DataRecord instance Returns ------- """ if not self.use_local_cache: return if data_record.status == STATUS_CONSTANTS["INPROGRESS"]: return self._cache[data_record.idempotency_key] = data_record def _retrieve_from_cache(self, idempotency_key: str): if not self.use_local_cache: return cached_record = self._cache.get(idempotency_key) if cached_record: if not cached_record.is_expired: return cached_record logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}") self._delete_from_cache(idempotency_key) def _delete_from_cache(self, idempotency_key: str): if not self.use_local_cache: return if idempotency_key in self._cache: del self._cache[idempotency_key] def save_success(self, event: Dict[str, Any], context: LambdaContext, result: dict) -> None: """ Save record of function's execution completing successfully Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context result: dict The response from lambda handler """ response_data = json.dumps(result, cls=Encoder) data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(event, context), status=STATUS_CONSTANTS["COMPLETED"], expiry_timestamp=self._get_expiry_timestamp(), response_data=response_data, payload_hash=self._get_hashed_payload(event), ) logger.debug( f"Lambda successfully executed. Saving record to persistence store with " f"idempotency key: {data_record.idempotency_key}" ) self._update_record(data_record=data_record) self._save_to_cache(data_record) def save_inprogress(self, event: Dict[str, Any], context: LambdaContext) -> None: """ Save record of function's execution being in progress Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context """ data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(event, context), status=STATUS_CONSTANTS["INPROGRESS"], expiry_timestamp=self._get_expiry_timestamp(), payload_hash=self._get_hashed_payload(event), ) logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}") if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key): raise IdempotencyItemAlreadyExistsError self._put_record(data_record) def delete_record(self, event: Dict[str, Any], context: LambdaContext, exception: Exception): """ Delete record from the persistence store Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context exception The exception raised by the lambda handler """ data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(event, context)) logger.debug( f"Lambda raised an exception ({type(exception).__name__}). Clearing in progress record in persistence " f"store for idempotency key: {data_record.idempotency_key}" ) self._delete_record(data_record) self._delete_from_cache(data_record.idempotency_key) def get_record(self, event: Dict[str, Any], context: LambdaContext) -> DataRecord: """ Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key and return it as a DataRecord instance.and return it as a DataRecord instance. Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context Returns ------- DataRecord DataRecord representation of existing record found in persistence store Raises ------ IdempotencyItemNotFoundError Exception raised if no record exists in persistence store with the idempotency key IdempotencyValidationError Event payload doesn't match the stored record for the given idempotency key """ idempotency_key = self._get_hashed_idempotency_key(event, context) cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key) if cached_record: logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}") self._validate_payload(event, cached_record) return cached_record record = self._get_record(idempotency_key) self._save_to_cache(data_record=record) self._validate_payload(event, record) return record @abstractmethod def _get_record(self, idempotency_key) -> DataRecord: """ Retrieve item from persistence store using idempotency key and return it as a DataRecord instance. Parameters ---------- idempotency_key Returns ------- DataRecord DataRecord representation of existing record found in persistence store Raises ------ IdempotencyItemNotFoundError Exception raised if no record exists in persistence store with the idempotency key """ raise NotImplementedError @abstractmethod def _put_record(self, data_record: DataRecord) -> None: """ Add a DataRecord to persistence store if it does not already exist with that key. Raise ItemAlreadyExists if a non-expired entry already exists. Parameters ---------- data_record: DataRecord DataRecord instance """ raise NotImplementedError @abstractmethod def _update_record(self, data_record: DataRecord) -> None: """ Update item in persistence store Parameters ---------- data_record: DataRecord DataRecord instance """ raise NotImplementedError @abstractmethod def _delete_record(self, data_record: DataRecord) -> None: """ Remove item from persistence store Parameters ---------- data_record: DataRecord DataRecord instance """ raise NotImplementedError
Ancestors
- abc.ABC
Subclasses
Static methods
def is_missing_idempotency_key(data) ‑> bool
-
Expand source code
@staticmethod def is_missing_idempotency_key(data) -> bool: if type(data).__name__ in ("tuple", "list", "dict"): return all(x is None for x in data) return not data
Methods
def configure(self, config: IdempotencyConfig) ‑> NoneType
-
Initialize the base persistence layer from the configuration settings
Parameters
config
:IdempotencyConfig
- Idempotency configuration settings
Expand source code
def configure(self, config: IdempotencyConfig) -> None: """ Initialize the base persistence layer from the configuration settings Parameters ---------- config: IdempotencyConfig Idempotency configuration settings """ if self.configured: # Prevent being reconfigured multiple times return self.configured = True self.event_key_jmespath = config.event_key_jmespath if config.event_key_jmespath: self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath) self.jmespath_options = config.jmespath_options if not self.jmespath_options: self.jmespath_options = {"custom_functions": PowertoolsFunctions()} if config.payload_validation_jmespath: self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath) self.payload_validation_enabled = True self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key self.expires_after_seconds = config.expires_after_seconds self.use_local_cache = config.use_local_cache if self.use_local_cache: self._cache = LRUDict(max_items=config.local_cache_max_items) self.hash_function = getattr(hashlib, config.hash_function)
def delete_record(self, event: Dict[str, Any], context: LambdaContext, exception: Exception)
-
Delete record from the persistence store
Parameters
event
:Dict[str, Any]
- Lambda event
context
:LambdaContext
- Lambda context
exception
- The exception raised by the lambda handler
Expand source code
def delete_record(self, event: Dict[str, Any], context: LambdaContext, exception: Exception): """ Delete record from the persistence store Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context exception The exception raised by the lambda handler """ data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(event, context)) logger.debug( f"Lambda raised an exception ({type(exception).__name__}). Clearing in progress record in persistence " f"store for idempotency key: {data_record.idempotency_key}" ) self._delete_record(data_record) self._delete_from_cache(data_record.idempotency_key)
def get_record(self, event: Dict[str, Any], context: LambdaContext) ‑> DataRecord
-
Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key and return it as a DataRecord instance.and return it as a DataRecord instance.
Parameters
event
:Dict[str, Any]
- Lambda event
context
:LambdaContext
- Lambda context
Returns
DataRecord
- DataRecord representation of existing record found in persistence store
Raises
IdempotencyItemNotFoundError
- Exception raised if no record exists in persistence store with the idempotency key
IdempotencyValidationError
- Event payload doesn't match the stored record for the given idempotency key
Expand source code
def get_record(self, event: Dict[str, Any], context: LambdaContext) -> DataRecord: """ Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key and return it as a DataRecord instance.and return it as a DataRecord instance. Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context Returns ------- DataRecord DataRecord representation of existing record found in persistence store Raises ------ IdempotencyItemNotFoundError Exception raised if no record exists in persistence store with the idempotency key IdempotencyValidationError Event payload doesn't match the stored record for the given idempotency key """ idempotency_key = self._get_hashed_idempotency_key(event, context) cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key) if cached_record: logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}") self._validate_payload(event, cached_record) return cached_record record = self._get_record(idempotency_key) self._save_to_cache(data_record=record) self._validate_payload(event, record) return record
def save_inprogress(self, event: Dict[str, Any], context: LambdaContext) ‑> NoneType
-
Save record of function's execution being in progress
Parameters
event
:Dict[str, Any]
- Lambda event
context
:LambdaContext
- Lambda context
Expand source code
def save_inprogress(self, event: Dict[str, Any], context: LambdaContext) -> None: """ Save record of function's execution being in progress Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context """ data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(event, context), status=STATUS_CONSTANTS["INPROGRESS"], expiry_timestamp=self._get_expiry_timestamp(), payload_hash=self._get_hashed_payload(event), ) logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}") if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key): raise IdempotencyItemAlreadyExistsError self._put_record(data_record)
def save_success(self, event: Dict[str, Any], context: LambdaContext, result: dict) ‑> NoneType
-
Save record of function's execution completing successfully
Parameters
event
:Dict[str, Any]
- Lambda event
context
:LambdaContext
- Lambda context
result
:dict
- The response from lambda handler
Expand source code
def save_success(self, event: Dict[str, Any], context: LambdaContext, result: dict) -> None: """ Save record of function's execution completing successfully Parameters ---------- event: Dict[str, Any] Lambda event context: LambdaContext Lambda context result: dict The response from lambda handler """ response_data = json.dumps(result, cls=Encoder) data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(event, context), status=STATUS_CONSTANTS["COMPLETED"], expiry_timestamp=self._get_expiry_timestamp(), response_data=response_data, payload_hash=self._get_hashed_payload(event), ) logger.debug( f"Lambda successfully executed. Saving record to persistence store with " f"idempotency key: {data_record.idempotency_key}" ) self._update_record(data_record=data_record) self._save_to_cache(data_record)
class DynamoDBPersistenceLayer (table_name: str, key_attr: str = 'id', expiry_attr: str = 'expiration', status_attr: str = 'status', data_attr: str = 'data', validation_key_attr: str = 'validation', boto_config: Union[botocore.config.Config, NoneType] = None, boto3_session: Union[boto3.session.Session, NoneType] = None)
-
Abstract Base Class for Idempotency persistence layer.
Initialize the DynamoDB client
Parameters
table_name
:str
- Name of the table to use for storing execution records
key_attr
:str
, optional- DynamoDB attribute name for key, by default "id"
expiry_attr
:str
, optional- DynamoDB attribute name for expiry timestamp, by default "expiration"
status_attr
:str
, optional- DynamoDB attribute name for status, by default "status"
data_attr
:str
, optional- DynamoDB attribute name for response data, by default "data"
boto_config
:botocore.config.Config
, optional- Botocore configuration to pass during client initialization
boto3_session
:boto3.session.Session
, optional- Boto3 session to use for AWS API communication
Examples
Create a DynamoDB persistence layer with custom settings
>>> from aws_lambda_powertools.utilities.idempotency import ( >>> idempotent, DynamoDBPersistenceLayer >>> ) >>> >>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store") >>> >>> @idempotent(persistence_store=persistence_store) >>> def handler(event, context): >>> return {"StatusCode": 200}
Expand source code
class DynamoDBPersistenceLayer(BasePersistenceLayer): def __init__( self, table_name: str, key_attr: str = "id", expiry_attr: str = "expiration", status_attr: str = "status", data_attr: str = "data", validation_key_attr: str = "validation", boto_config: Optional[Config] = None, boto3_session: Optional[boto3.session.Session] = None, ): """ Initialize the DynamoDB client Parameters ---------- table_name: str Name of the table to use for storing execution records key_attr: str, optional DynamoDB attribute name for key, by default "id" expiry_attr: str, optional DynamoDB attribute name for expiry timestamp, by default "expiration" status_attr: str, optional DynamoDB attribute name for status, by default "status" data_attr: str, optional DynamoDB attribute name for response data, by default "data" boto_config: botocore.config.Config, optional Botocore configuration to pass during client initialization boto3_session : boto3.session.Session, optional Boto3 session to use for AWS API communication Examples -------- **Create a DynamoDB persistence layer with custom settings** >>> from aws_lambda_powertools.utilities.idempotency import ( >>> idempotent, DynamoDBPersistenceLayer >>> ) >>> >>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store") >>> >>> @idempotent(persistence_store=persistence_store) >>> def handler(event, context): >>> return {"StatusCode": 200} """ boto_config = boto_config or Config() session = boto3_session or boto3.session.Session() self._ddb_resource = session.resource("dynamodb", config=boto_config) self.table_name = table_name self.table = self._ddb_resource.Table(self.table_name) self.key_attr = key_attr self.expiry_attr = expiry_attr self.status_attr = status_attr self.data_attr = data_attr self.validation_key_attr = validation_key_attr super(DynamoDBPersistenceLayer, self).__init__() def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord: """ Translate raw item records from DynamoDB to DataRecord Parameters ---------- item: Dict[str, Union[str, int]] Item format from dynamodb response Returns ------- DataRecord representation of item """ return DataRecord( idempotency_key=item[self.key_attr], status=item[self.status_attr], expiry_timestamp=item[self.expiry_attr], response_data=item.get(self.data_attr), payload_hash=item.get(self.validation_key_attr), ) def _get_record(self, idempotency_key) -> DataRecord: response = self.table.get_item(Key={self.key_attr: idempotency_key}, ConsistentRead=True) try: item = response["Item"] except KeyError: raise IdempotencyItemNotFoundError return self._item_to_data_record(item) def _put_record(self, data_record: DataRecord) -> None: item = { self.key_attr: data_record.idempotency_key, self.expiry_attr: data_record.expiry_timestamp, self.status_attr: data_record.status, } if self.payload_validation_enabled: item[self.validation_key_attr] = data_record.payload_hash now = datetime.datetime.now() try: logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}") self.table.put_item( Item=item, ConditionExpression=f"attribute_not_exists({self.key_attr}) OR {self.expiry_attr} < :now", ExpressionAttributeValues={":now": int(now.timestamp())}, ) except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException: logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}") raise IdempotencyItemAlreadyExistsError def _update_record(self, data_record: DataRecord): logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}") update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status" expression_attr_values = { ":expiry": data_record.expiry_timestamp, ":response_data": data_record.response_data, ":status": data_record.status, } expression_attr_names = { "#response_data": self.data_attr, "#expiry": self.expiry_attr, "#status": self.status_attr, } if self.payload_validation_enabled: update_expression += ", #validation_key = :validation_key" expression_attr_values[":validation_key"] = data_record.payload_hash expression_attr_names["#validation_key"] = self.validation_key_attr kwargs = { "Key": {self.key_attr: data_record.idempotency_key}, "UpdateExpression": update_expression, "ExpressionAttributeValues": expression_attr_values, "ExpressionAttributeNames": expression_attr_names, } self.table.update_item(**kwargs) def _delete_record(self, data_record: DataRecord) -> None: logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}") self.table.delete_item(Key={self.key_attr: data_record.idempotency_key})
Ancestors
- BasePersistenceLayer
- abc.ABC
Inherited members
class IdempotencyConfig (event_key_jmespath: str = '', payload_validation_jmespath: str = '', jmespath_options: Dict = None, raise_on_no_idempotency_key: bool = False, expires_after_seconds: int = 3600, use_local_cache: bool = False, local_cache_max_items: int = 256, hash_function: str = 'md5')
-
Initialize the base persistence layer
Parameters
event_key_jmespath
:str
- A jmespath expression to extract the idempotency key from the event record
payload_validation_jmespath
:str
- A jmespath expression to extract the payload to be validated from the event record
raise_on_no_idempotency_key
:bool
, optional- Raise exception if no idempotency key was found in the request, by default False
expires_after_seconds
:int
- The number of seconds to wait before a record is expired
use_local_cache
:bool
, optional- Whether to locally cache idempotency results, by default False
local_cache_max_items
:int
, optional- Max number of items to store in local cache, by default 1024
hash_function
:str
, optional- Function to use for calculating hashes, by default md5.
Expand source code
class IdempotencyConfig: def __init__( self, event_key_jmespath: str = "", payload_validation_jmespath: str = "", jmespath_options: Dict = None, raise_on_no_idempotency_key: bool = False, expires_after_seconds: int = 60 * 60, # 1 hour default use_local_cache: bool = False, local_cache_max_items: int = 256, hash_function: str = "md5", ): """ Initialize the base persistence layer Parameters ---------- event_key_jmespath: str A jmespath expression to extract the idempotency key from the event record payload_validation_jmespath: str A jmespath expression to extract the payload to be validated from the event record raise_on_no_idempotency_key: bool, optional Raise exception if no idempotency key was found in the request, by default False expires_after_seconds: int The number of seconds to wait before a record is expired use_local_cache: bool, optional Whether to locally cache idempotency results, by default False local_cache_max_items: int, optional Max number of items to store in local cache, by default 1024 hash_function: str, optional Function to use for calculating hashes, by default md5. """ self.event_key_jmespath = event_key_jmespath self.payload_validation_jmespath = payload_validation_jmespath self.jmespath_options = jmespath_options self.raise_on_no_idempotency_key = raise_on_no_idempotency_key self.expires_after_seconds = expires_after_seconds self.use_local_cache = use_local_cache self.local_cache_max_items = local_cache_max_items self.hash_function = hash_function