Module aws_lambda_powertools.utilities.idempotency
Utility for adding idempotency to lambda functions
Expand source code
"""
Utility for adding idempotency to lambda functions
"""
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
BasePersistenceLayer,
)
from aws_lambda_powertools.utilities.idempotency.persistence.dynamodb import (
DynamoDBPersistenceLayer,
)
from .idempotency import IdempotencyConfig, idempotent, idempotent_function
__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent", "idempotent_function", "IdempotencyConfig")
Sub-modules
aws_lambda_powertools.utilities.idempotency.base
aws_lambda_powertools.utilities.idempotency.config
aws_lambda_powertools.utilities.idempotency.exceptions
-
Idempotency errors
aws_lambda_powertools.utilities.idempotency.idempotency
-
Primary interface for idempotent Lambda functions utility
aws_lambda_powertools.utilities.idempotency.persistence
Functions
def idempotent(handler: Callable[[Any, aws_lambda_powertools.utilities.lambda_context.LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, persistence_store: BasePersistenceLayer, config: Optional[IdempotencyConfig] = None, **kwargs) ‑> Any
-
Decorator to handle idempotency
Parameters
handler
:Callable
- Lambda's handler
event
:Dict
- Lambda's Event
context
:Dict
- Lambda's Context
persistence_store
:BasePersistenceLayer
- Instance of BasePersistenceLayer to store data
config
:IdempotencyConfig
- Configuration
Examples
Processes Lambda's event in an idempotent manner
>>> from aws_lambda_powertools.utilities.idempotency import ( >>> idempotent, DynamoDBPersistenceLayer, IdempotencyConfig >>> ) >>> >>> idem_config=IdempotencyConfig(event_key_jmespath="body") >>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store") >>> >>> @idempotent(config=idem_config, persistence_store=persistence_layer) >>> def handler(event, context): >>> return {"StatusCode": 200}
Expand source code
@lambda_handler_decorator def idempotent( handler: Callable[[Any, LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, persistence_store: BasePersistenceLayer, config: Optional[IdempotencyConfig] = None, **kwargs, ) -> Any: """ Decorator to handle idempotency Parameters ---------- handler: Callable Lambda's handler event: Dict Lambda's Event context: Dict Lambda's Context persistence_store: BasePersistenceLayer Instance of BasePersistenceLayer to store data config: IdempotencyConfig Configuration Examples -------- **Processes Lambda's event in an idempotent manner** >>> from aws_lambda_powertools.utilities.idempotency import ( >>> idempotent, DynamoDBPersistenceLayer, IdempotencyConfig >>> ) >>> >>> idem_config=IdempotencyConfig(event_key_jmespath="body") >>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store") >>> >>> @idempotent(config=idem_config, persistence_store=persistence_layer) >>> def handler(event, context): >>> return {"StatusCode": 200} """ if os.getenv(constants.IDEMPOTENCY_DISABLED_ENV): return handler(event, context) config = config or IdempotencyConfig() config.register_lambda_context(context) args = event, context idempotency_handler = IdempotencyHandler( function=handler, function_payload=event, config=config, persistence_store=persistence_store, function_args=args, function_kwargs=kwargs, ) return idempotency_handler.handle()
def idempotent_function(function: Optional[~AnyCallableT] = None, *, data_keyword_argument: str, persistence_store: BasePersistenceLayer, config: Optional[IdempotencyConfig] = None) ‑> Any
-
Decorator to handle idempotency of any function
Parameters
function
:Callable
- Function to be decorated
data_keyword_argument
:str
- Keyword parameter name in function's signature that we should hash as idempotency key, e.g. "order"
persistence_store
:BasePersistenceLayer
- Instance of BasePersistenceLayer to store data
config
:IdempotencyConfig
- Configuration
Examples
Processes an order in an idempotent manner
from aws_lambda_powertools.utilities.idempotency import ( idempotent_function, DynamoDBPersistenceLayer, IdempotencyConfig ) idem_config=IdempotencyConfig(event_key_jmespath="order_id") persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store") @idempotent_function(data_keyword_argument="order", config=idem_config, persistence_store=persistence_layer) def process_order(customer_id: str, order: dict, **kwargs): return {"StatusCode": 200}
Expand source code
def idempotent_function( function: Optional[AnyCallableT] = None, *, data_keyword_argument: str, persistence_store: BasePersistenceLayer, config: Optional[IdempotencyConfig] = None, ) -> Any: """ Decorator to handle idempotency of any function Parameters ---------- function: Callable Function to be decorated data_keyword_argument: str Keyword parameter name in function's signature that we should hash as idempotency key, e.g. "order" persistence_store: BasePersistenceLayer Instance of BasePersistenceLayer to store data config: IdempotencyConfig Configuration Examples -------- **Processes an order in an idempotent manner** from aws_lambda_powertools.utilities.idempotency import ( idempotent_function, DynamoDBPersistenceLayer, IdempotencyConfig ) idem_config=IdempotencyConfig(event_key_jmespath="order_id") persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store") @idempotent_function(data_keyword_argument="order", config=idem_config, persistence_store=persistence_layer) def process_order(customer_id: str, order: dict, **kwargs): return {"StatusCode": 200} """ if not function: return cast( AnyCallableT, functools.partial( idempotent_function, data_keyword_argument=data_keyword_argument, persistence_store=persistence_store, config=config, ), ) config = config or IdempotencyConfig() @functools.wraps(function) def decorate(*args, **kwargs): if os.getenv(constants.IDEMPOTENCY_DISABLED_ENV): return function(*args, **kwargs) if data_keyword_argument not in kwargs: raise RuntimeError( f"Unable to extract '{data_keyword_argument}' from keyword arguments." f" Ensure this exists in your function's signature as well as the caller used it as a keyword argument" ) payload = kwargs.get(data_keyword_argument) idempotency_handler = IdempotencyHandler( function=function, function_payload=payload, config=config, persistence_store=persistence_store, function_args=args, function_kwargs=kwargs, ) return idempotency_handler.handle() return cast(AnyCallableT, decorate)
Classes
class BasePersistenceLayer
-
Abstract Base Class for Idempotency persistence layer.
Initialize the defaults
Expand source code
class BasePersistenceLayer(ABC): """ Abstract Base Class for Idempotency persistence layer. """ def __init__(self): """Initialize the defaults""" self.function_name = "" self.configured = False self.event_key_jmespath: Optional[str] = None self.event_key_compiled_jmespath = None self.jmespath_options: Optional[dict] = None self.payload_validation_enabled = False self.validation_key_jmespath = None self.raise_on_no_idempotency_key = False self.expires_after_seconds: int = 60 * 60 # 1 hour default self.use_local_cache = False self.hash_function = None def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) -> None: """ Initialize the base persistence layer from the configuration settings Parameters ---------- config: IdempotencyConfig Idempotency configuration settings function_name: str, Optional The name of the function being decorated """ self.function_name = f"{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, 'test-func')}.{function_name or ''}" if self.configured: # Prevent being reconfigured multiple times return self.configured = True self.event_key_jmespath = config.event_key_jmespath if config.event_key_jmespath: self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath) self.jmespath_options = config.jmespath_options if not self.jmespath_options: self.jmespath_options = {"custom_functions": PowertoolsFunctions()} if config.payload_validation_jmespath: self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath) self.payload_validation_enabled = True self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key self.expires_after_seconds = config.expires_after_seconds self.use_local_cache = config.use_local_cache if self.use_local_cache: self._cache = LRUDict(max_items=config.local_cache_max_items) self.hash_function = getattr(hashlib, config.hash_function) def _get_hashed_idempotency_key(self, data: Dict[str, Any]) -> str: """ Extract idempotency key and return a hashed representation Parameters ---------- data: Dict[str, Any] Incoming data Returns ------- str Hashed representation of the data extracted by the jmespath expression """ if self.event_key_jmespath: data = self.event_key_compiled_jmespath.search(data, options=jmespath.Options(**self.jmespath_options)) if self.is_missing_idempotency_key(data=data): if self.raise_on_no_idempotency_key: raise IdempotencyKeyError("No data found to create a hashed idempotency_key") warnings.warn(f"No value found for idempotency_key. jmespath: {self.event_key_jmespath}") generated_hash = self._generate_hash(data=data) return f"{self.function_name}#{generated_hash}" @staticmethod def is_missing_idempotency_key(data) -> bool: if type(data).__name__ in ("tuple", "list", "dict"): return all(x is None for x in data) return not data def _get_hashed_payload(self, data: Dict[str, Any]) -> str: """ Extract payload using validation key jmespath and return a hashed representation Parameters ---------- data: Dict[str, Any] Payload Returns ------- str Hashed representation of the data extracted by the jmespath expression """ if not self.payload_validation_enabled: return "" data = self.validation_key_jmespath.search(data) return self._generate_hash(data=data) def _generate_hash(self, data: Any) -> str: """ Generate a hash value from the provided data Parameters ---------- data: Any The data to hash Returns ------- str Hashed representation of the provided data """ hashed_data = self.hash_function(json.dumps(data, cls=Encoder, sort_keys=True).encode()) return hashed_data.hexdigest() def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> None: """ Validate that the hashed payload matches data provided and stored data record Parameters ---------- data: Dict[str, Any] Payload data_record: DataRecord DataRecord instance Raises ---------- IdempotencyValidationError Payload doesn't match the stored record for the given idempotency key """ if self.payload_validation_enabled: data_hash = self._get_hashed_payload(data=data) if data_record.payload_hash != data_hash: raise IdempotencyValidationError("Payload does not match stored record for this event key") def _get_expiry_timestamp(self) -> int: """ Returns ------- int unix timestamp of expiry date for idempotency record """ now = datetime.datetime.now() period = datetime.timedelta(seconds=self.expires_after_seconds) return int((now + period).timestamp()) def _save_to_cache(self, data_record: DataRecord): """ Save data_record to local cache except when status is "INPROGRESS" NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the execution environment Parameters ---------- data_record: DataRecord DataRecord instance Returns ------- """ if not self.use_local_cache: return if data_record.status == STATUS_CONSTANTS["INPROGRESS"]: return self._cache[data_record.idempotency_key] = data_record def _retrieve_from_cache(self, idempotency_key: str): if not self.use_local_cache: return cached_record = self._cache.get(key=idempotency_key) if cached_record: if not cached_record.is_expired: return cached_record logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}") self._delete_from_cache(idempotency_key=idempotency_key) def _delete_from_cache(self, idempotency_key: str): if not self.use_local_cache: return if idempotency_key in self._cache: del self._cache[idempotency_key] def save_success(self, data: Dict[str, Any], result: dict) -> None: """ Save record of function's execution completing successfully Parameters ---------- data: Dict[str, Any] Payload result: dict The response from function """ response_data = json.dumps(result, cls=Encoder, sort_keys=True) data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(data=data), status=STATUS_CONSTANTS["COMPLETED"], expiry_timestamp=self._get_expiry_timestamp(), response_data=response_data, payload_hash=self._get_hashed_payload(data=data), ) logger.debug( f"Function successfully executed. Saving record to persistence store with " f"idempotency key: {data_record.idempotency_key}" ) self._update_record(data_record=data_record) self._save_to_cache(data_record=data_record) def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None: """ Save record of function's execution being in progress Parameters ---------- data: Dict[str, Any] Payload remaining_time_in_millis: Optional[int] If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis """ data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(data=data), status=STATUS_CONSTANTS["INPROGRESS"], expiry_timestamp=self._get_expiry_timestamp(), payload_hash=self._get_hashed_payload(data=data), ) if remaining_time_in_millis: now = datetime.datetime.now() period = datetime.timedelta(milliseconds=remaining_time_in_millis) timestamp = (now + period).timestamp() data_record.in_progress_expiry_timestamp = int(timestamp * 1000) else: warnings.warn( "Couldn't determine the remaining time left. " "Did you call register_lambda_context on IdempotencyConfig?" ) logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}") if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key): raise IdempotencyItemAlreadyExistsError self._put_record(data_record=data_record) def delete_record(self, data: Dict[str, Any], exception: Exception): """ Delete record from the persistence store Parameters ---------- data: Dict[str, Any] Payload exception The exception raised by the function """ data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(data=data)) logger.debug( f"Function raised an exception ({type(exception).__name__}). Clearing in progress record in persistence " f"store for idempotency key: {data_record.idempotency_key}" ) self._delete_record(data_record=data_record) self._delete_from_cache(idempotency_key=data_record.idempotency_key) def get_record(self, data: Dict[str, Any]) -> DataRecord: """ Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord. Parameters ---------- data: Dict[str, Any] Payload Returns ------- DataRecord DataRecord representation of existing record found in persistence store Raises ------ IdempotencyItemNotFoundError Exception raised if no record exists in persistence store with the idempotency key IdempotencyValidationError Payload doesn't match the stored record for the given idempotency key """ idempotency_key = self._get_hashed_idempotency_key(data=data) cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key) if cached_record: logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}") self._validate_payload(data=data, data_record=cached_record) return cached_record record = self._get_record(idempotency_key=idempotency_key) self._save_to_cache(data_record=record) self._validate_payload(data=data, data_record=record) return record @abstractmethod def _get_record(self, idempotency_key) -> DataRecord: """ Retrieve item from persistence store using idempotency key and return it as a DataRecord instance. Parameters ---------- idempotency_key Returns ------- DataRecord DataRecord representation of existing record found in persistence store Raises ------ IdempotencyItemNotFoundError Exception raised if no record exists in persistence store with the idempotency key """ raise NotImplementedError @abstractmethod def _put_record(self, data_record: DataRecord) -> None: """ Add a DataRecord to persistence store if it does not already exist with that key. Raise ItemAlreadyExists if a non-expired entry already exists. Parameters ---------- data_record: DataRecord DataRecord instance """ raise NotImplementedError @abstractmethod def _update_record(self, data_record: DataRecord) -> None: """ Update item in persistence store Parameters ---------- data_record: DataRecord DataRecord instance """ raise NotImplementedError @abstractmethod def _delete_record(self, data_record: DataRecord) -> None: """ Remove item from persistence store Parameters ---------- data_record: DataRecord DataRecord instance """ raise NotImplementedError
Ancestors
- abc.ABC
Subclasses
Static methods
def is_missing_idempotency_key(data) ‑> bool
-
Expand source code
@staticmethod def is_missing_idempotency_key(data) -> bool: if type(data).__name__ in ("tuple", "list", "dict"): return all(x is None for x in data) return not data
Methods
def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) ‑> None
-
Initialize the base persistence layer from the configuration settings
Parameters
config
:IdempotencyConfig
- Idempotency configuration settings
function_name
:str, Optional
- The name of the function being decorated
Expand source code
def configure(self, config: IdempotencyConfig, function_name: Optional[str] = None) -> None: """ Initialize the base persistence layer from the configuration settings Parameters ---------- config: IdempotencyConfig Idempotency configuration settings function_name: str, Optional The name of the function being decorated """ self.function_name = f"{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, 'test-func')}.{function_name or ''}" if self.configured: # Prevent being reconfigured multiple times return self.configured = True self.event_key_jmespath = config.event_key_jmespath if config.event_key_jmespath: self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath) self.jmespath_options = config.jmespath_options if not self.jmespath_options: self.jmespath_options = {"custom_functions": PowertoolsFunctions()} if config.payload_validation_jmespath: self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath) self.payload_validation_enabled = True self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key self.expires_after_seconds = config.expires_after_seconds self.use_local_cache = config.use_local_cache if self.use_local_cache: self._cache = LRUDict(max_items=config.local_cache_max_items) self.hash_function = getattr(hashlib, config.hash_function)
def delete_record(self, data: Dict[str, Any], exception: Exception)
-
Delete record from the persistence store
Parameters
data
:Dict[str, Any]
- Payload
exception
- The exception raised by the function
Expand source code
def delete_record(self, data: Dict[str, Any], exception: Exception): """ Delete record from the persistence store Parameters ---------- data: Dict[str, Any] Payload exception The exception raised by the function """ data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(data=data)) logger.debug( f"Function raised an exception ({type(exception).__name__}). Clearing in progress record in persistence " f"store for idempotency key: {data_record.idempotency_key}" ) self._delete_record(data_record=data_record) self._delete_from_cache(idempotency_key=data_record.idempotency_key)
def get_record(self, data: Dict[str, Any]) ‑> DataRecord
-
Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.
Parameters
data
:Dict[str, Any]
- Payload
Returns
DataRecord
- DataRecord representation of existing record found in persistence store
Raises
IdempotencyItemNotFoundError
- Exception raised if no record exists in persistence store with the idempotency key
IdempotencyValidationError
- Payload doesn't match the stored record for the given idempotency key
Expand source code
def get_record(self, data: Dict[str, Any]) -> DataRecord: """ Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord. Parameters ---------- data: Dict[str, Any] Payload Returns ------- DataRecord DataRecord representation of existing record found in persistence store Raises ------ IdempotencyItemNotFoundError Exception raised if no record exists in persistence store with the idempotency key IdempotencyValidationError Payload doesn't match the stored record for the given idempotency key """ idempotency_key = self._get_hashed_idempotency_key(data=data) cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key) if cached_record: logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}") self._validate_payload(data=data, data_record=cached_record) return cached_record record = self._get_record(idempotency_key=idempotency_key) self._save_to_cache(data_record=record) self._validate_payload(data=data, data_record=record) return record
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) ‑> None
-
Save record of function's execution being in progress
Parameters
data
:Dict[str, Any]
- Payload
remaining_time_in_millis
:Optional[int]
- If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
Expand source code
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None: """ Save record of function's execution being in progress Parameters ---------- data: Dict[str, Any] Payload remaining_time_in_millis: Optional[int] If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis """ data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(data=data), status=STATUS_CONSTANTS["INPROGRESS"], expiry_timestamp=self._get_expiry_timestamp(), payload_hash=self._get_hashed_payload(data=data), ) if remaining_time_in_millis: now = datetime.datetime.now() period = datetime.timedelta(milliseconds=remaining_time_in_millis) timestamp = (now + period).timestamp() data_record.in_progress_expiry_timestamp = int(timestamp * 1000) else: warnings.warn( "Couldn't determine the remaining time left. " "Did you call register_lambda_context on IdempotencyConfig?" ) logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}") if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key): raise IdempotencyItemAlreadyExistsError self._put_record(data_record=data_record)
def save_success(self, data: Dict[str, Any], result: dict) ‑> None
-
Save record of function's execution completing successfully
Parameters
data
:Dict[str, Any]
- Payload
result
:dict
- The response from function
Expand source code
def save_success(self, data: Dict[str, Any], result: dict) -> None: """ Save record of function's execution completing successfully Parameters ---------- data: Dict[str, Any] Payload result: dict The response from function """ response_data = json.dumps(result, cls=Encoder, sort_keys=True) data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(data=data), status=STATUS_CONSTANTS["COMPLETED"], expiry_timestamp=self._get_expiry_timestamp(), response_data=response_data, payload_hash=self._get_hashed_payload(data=data), ) logger.debug( f"Function successfully executed. Saving record to persistence store with " f"idempotency key: {data_record.idempotency_key}" ) self._update_record(data_record=data_record) self._save_to_cache(data_record=data_record)
class DynamoDBPersistenceLayer (table_name: str, key_attr: str = 'id', static_pk_value: Optional[str] = None, sort_key_attr: Optional[str] = None, expiry_attr: str = 'expiration', in_progress_expiry_attr: str = 'in_progress_expiration', status_attr: str = 'status', data_attr: str = 'data', validation_key_attr: str = 'validation', boto_config: Optional[botocore.config.Config] = None, boto3_session: Optional[boto3.session.Session] = None)
-
Abstract Base Class for Idempotency persistence layer.
Initialize the DynamoDB client
Parameters
table_name
:str
- Name of the table to use for storing execution records
key_attr
:str
, optional- DynamoDB attribute name for partition key, by default "id"
static_pk_value
:str
, optional- DynamoDB attribute value for partition key, by default "idempotency#
". This will be used if the sort_key_attr is set. sort_key_attr
:str
, optional- DynamoDB attribute name for the sort key
expiry_attr
:str
, optional- DynamoDB attribute name for expiry timestamp, by default "expiration"
in_progress_expiry_attr
:str
, optional- DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
status_attr
:str
, optional- DynamoDB attribute name for status, by default "status"
data_attr
:str
, optional- DynamoDB attribute name for response data, by default "data"
boto_config
:botocore.config.Config
, optional- Botocore configuration to pass during client initialization
boto3_session
:boto3.session.Session
, optional- Boto3 session to use for AWS API communication
Examples
Create a DynamoDB persistence layer with custom settings
>>> from aws_lambda_powertools.utilities.idempotency import ( >>> idempotent, DynamoDBPersistenceLayer >>> ) >>> >>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store") >>> >>> @idempotent(persistence_store=persistence_store) >>> def handler(event, context): >>> return {"StatusCode": 200}
Expand source code
class DynamoDBPersistenceLayer(BasePersistenceLayer): def __init__( self, table_name: str, key_attr: str = "id", static_pk_value: Optional[str] = None, sort_key_attr: Optional[str] = None, expiry_attr: str = "expiration", in_progress_expiry_attr: str = "in_progress_expiration", status_attr: str = "status", data_attr: str = "data", validation_key_attr: str = "validation", boto_config: Optional[Config] = None, boto3_session: Optional[boto3.session.Session] = None, ): """ Initialize the DynamoDB client Parameters ---------- table_name: str Name of the table to use for storing execution records key_attr: str, optional DynamoDB attribute name for partition key, by default "id" static_pk_value: str, optional DynamoDB attribute value for partition key, by default "idempotency#<function-name>". This will be used if the sort_key_attr is set. sort_key_attr: str, optional DynamoDB attribute name for the sort key expiry_attr: str, optional DynamoDB attribute name for expiry timestamp, by default "expiration" in_progress_expiry_attr: str, optional DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration" status_attr: str, optional DynamoDB attribute name for status, by default "status" data_attr: str, optional DynamoDB attribute name for response data, by default "data" boto_config: botocore.config.Config, optional Botocore configuration to pass during client initialization boto3_session : boto3.session.Session, optional Boto3 session to use for AWS API communication Examples -------- **Create a DynamoDB persistence layer with custom settings** >>> from aws_lambda_powertools.utilities.idempotency import ( >>> idempotent, DynamoDBPersistenceLayer >>> ) >>> >>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store") >>> >>> @idempotent(persistence_store=persistence_store) >>> def handler(event, context): >>> return {"StatusCode": 200} """ self._boto_config = boto_config or Config() self._boto3_session = boto3_session or boto3.session.Session() if sort_key_attr == key_attr: raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!") if static_pk_value is None: static_pk_value = f"idempotency#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}" self._table = None self.table_name = table_name self.key_attr = key_attr self.static_pk_value = static_pk_value self.sort_key_attr = sort_key_attr self.expiry_attr = expiry_attr self.in_progress_expiry_attr = in_progress_expiry_attr self.status_attr = status_attr self.data_attr = data_attr self.validation_key_attr = validation_key_attr super(DynamoDBPersistenceLayer, self).__init__() @property def table(self): """ Caching property to store boto3 dynamodb Table resource """ if self._table: return self._table ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config) self._table = ddb_resource.Table(self.table_name) return self._table @table.setter def table(self, table): """ Allow table instance variable to be set directly, primarily for use in tests """ self._table = table def _get_key(self, idempotency_key: str) -> dict: if self.sort_key_attr: return {self.key_attr: self.static_pk_value, self.sort_key_attr: idempotency_key} return {self.key_attr: idempotency_key} def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord: """ Translate raw item records from DynamoDB to DataRecord Parameters ---------- item: Dict[str, Union[str, int]] Item format from dynamodb response Returns ------- DataRecord representation of item """ return DataRecord( idempotency_key=item[self.key_attr], status=item[self.status_attr], expiry_timestamp=item[self.expiry_attr], in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr), response_data=item.get(self.data_attr), payload_hash=item.get(self.validation_key_attr), ) def _get_record(self, idempotency_key) -> DataRecord: response = self.table.get_item(Key=self._get_key(idempotency_key), ConsistentRead=True) try: item = response["Item"] except KeyError: raise IdempotencyItemNotFoundError return self._item_to_data_record(item) def _put_record(self, data_record: DataRecord) -> None: item = { **self._get_key(data_record.idempotency_key), self.expiry_attr: data_record.expiry_timestamp, self.status_attr: data_record.status, } if data_record.in_progress_expiry_timestamp is not None: item[self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp if self.payload_validation_enabled: item[self.validation_key_attr] = data_record.payload_hash now = datetime.datetime.now() try: logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}") # | LOCKED | RETRY if status = "INPROGRESS" | RETRY # |----------------|-------------------------------------------------------|-------------> .... (time) # | Lambda Idempotency Record # | Timeout Timeout # | (in_progress_expiry) (expiry) # Conditions to successfully save a record: # The idempotency key does not exist: # - first time that this invocation key is used # - previous invocation with the same key was deleted due to TTL idempotency_key_not_exist = "attribute_not_exists(#id)" # The idempotency record exists but it's expired: idempotency_expiry_expired = "#expiry < :now" # The status of the record is "INPROGRESS", there is an in-progress expiry timestamp, but it's expired inprogress_expiry_expired = " AND ".join( [ "#status = :inprogress", "attribute_exists(#in_progress_expiry)", "#in_progress_expiry < :now_in_millis", ] ) condition_expression = ( f"{idempotency_key_not_exist} OR {idempotency_expiry_expired} OR ({inprogress_expiry_expired})" ) self.table.put_item( Item=item, ConditionExpression=condition_expression, ExpressionAttributeNames={ "#id": self.key_attr, "#expiry": self.expiry_attr, "#in_progress_expiry": self.in_progress_expiry_attr, "#status": self.status_attr, }, ExpressionAttributeValues={ ":now": int(now.timestamp()), ":now_in_millis": int(now.timestamp() * 1000), ":inprogress": STATUS_CONSTANTS["INPROGRESS"], }, ) except self.table.meta.client.exceptions.ConditionalCheckFailedException: logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}") raise IdempotencyItemAlreadyExistsError def _update_record(self, data_record: DataRecord): logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}") update_expression = "SET #response_data = :response_data, #expiry = :expiry, " "#status = :status" expression_attr_values = { ":expiry": data_record.expiry_timestamp, ":response_data": data_record.response_data, ":status": data_record.status, } expression_attr_names = { "#expiry": self.expiry_attr, "#response_data": self.data_attr, "#status": self.status_attr, } if self.payload_validation_enabled: update_expression += ", #validation_key = :validation_key" expression_attr_values[":validation_key"] = data_record.payload_hash expression_attr_names["#validation_key"] = self.validation_key_attr kwargs = { "Key": self._get_key(data_record.idempotency_key), "UpdateExpression": update_expression, "ExpressionAttributeValues": expression_attr_values, "ExpressionAttributeNames": expression_attr_names, } self.table.update_item(**kwargs) def _delete_record(self, data_record: DataRecord) -> None: logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}") self.table.delete_item(Key=self._get_key(data_record.idempotency_key))
Ancestors
- BasePersistenceLayer
- abc.ABC
Instance variables
var table
-
Caching property to store boto3 dynamodb Table resource
Expand source code
@property def table(self): """ Caching property to store boto3 dynamodb Table resource """ if self._table: return self._table ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config) self._table = ddb_resource.Table(self.table_name) return self._table
Inherited members
class IdempotencyConfig (event_key_jmespath: str = '', payload_validation_jmespath: str = '', jmespath_options: Optional[Dict[~KT, ~VT]] = None, raise_on_no_idempotency_key: bool = False, expires_after_seconds: int = 3600, use_local_cache: bool = False, local_cache_max_items: int = 256, hash_function: str = 'md5', lambda_context: Optional[LambdaContext] = None)
-
Initialize the base persistence layer
Parameters
event_key_jmespath
:str
- A jmespath expression to extract the idempotency key from the event record
payload_validation_jmespath
:str
- A jmespath expression to extract the payload to be validated from the event record
raise_on_no_idempotency_key
:bool
, optional- Raise exception if no idempotency key was found in the request, by default False
expires_after_seconds
:int
- The number of seconds to wait before a record is expired
use_local_cache
:bool
, optional- Whether to locally cache idempotency results, by default False
local_cache_max_items
:int
, optional- Max number of items to store in local cache, by default 1024
hash_function
:str
, optional- Function to use for calculating hashes, by default md5.
lambda_context
:LambdaContext
, optional- Lambda Context containing information about the invocation, function and execution environment.
Expand source code
class IdempotencyConfig: def __init__( self, event_key_jmespath: str = "", payload_validation_jmespath: str = "", jmespath_options: Optional[Dict] = None, raise_on_no_idempotency_key: bool = False, expires_after_seconds: int = 60 * 60, # 1 hour default use_local_cache: bool = False, local_cache_max_items: int = 256, hash_function: str = "md5", lambda_context: Optional[LambdaContext] = None, ): """ Initialize the base persistence layer Parameters ---------- event_key_jmespath: str A jmespath expression to extract the idempotency key from the event record payload_validation_jmespath: str A jmespath expression to extract the payload to be validated from the event record raise_on_no_idempotency_key: bool, optional Raise exception if no idempotency key was found in the request, by default False expires_after_seconds: int The number of seconds to wait before a record is expired use_local_cache: bool, optional Whether to locally cache idempotency results, by default False local_cache_max_items: int, optional Max number of items to store in local cache, by default 1024 hash_function: str, optional Function to use for calculating hashes, by default md5. lambda_context: LambdaContext, optional Lambda Context containing information about the invocation, function and execution environment. """ self.event_key_jmespath = event_key_jmespath self.payload_validation_jmespath = payload_validation_jmespath self.jmespath_options = jmespath_options self.raise_on_no_idempotency_key = raise_on_no_idempotency_key self.expires_after_seconds = expires_after_seconds self.use_local_cache = use_local_cache self.local_cache_max_items = local_cache_max_items self.hash_function = hash_function self.lambda_context: Optional[LambdaContext] = lambda_context def register_lambda_context(self, lambda_context: LambdaContext): """Captures the Lambda context, to calculate the remaining time before the invocation times out""" self.lambda_context = lambda_context
Methods
def register_lambda_context(self, lambda_context: LambdaContext)
-
Captures the Lambda context, to calculate the remaining time before the invocation times out
Expand source code
def register_lambda_context(self, lambda_context: LambdaContext): """Captures the Lambda context, to calculate the remaining time before the invocation times out""" self.lambda_context = lambda_context