Skip to content

Persistence

MODULE DESCRIPTION
base

Persistence layers supporting idempotency

datarecord

Data Class for idempotency records.

dynamodb
redis

base

Persistence layers supporting idempotency

CLASS DESCRIPTION
BasePersistenceLayer

Abstract Base Class for Idempotency persistence layer.

BasePersistenceLayer

BasePersistenceLayer()

Bases: ABC

Abstract Base Class for Idempotency persistence layer.

METHOD DESCRIPTION
configure

Initialize the base persistence layer from the configuration settings

delete_record

Delete record from the persistence store

get_record

Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

save_inprogress

Save record of function's execution being in progress

save_success

Save record of function's execution completing successfully

Source code in aws_lambda_powertools/utilities/idempotency/persistence/base.py
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(self):
    """Initialize the defaults"""
    self.function_name = ""
    self.configured = False
    self.event_key_jmespath: str = ""
    self.event_key_compiled_jmespath = None
    self.jmespath_options: dict | None = None
    self.payload_validation_enabled = False
    self.validation_key_jmespath = None
    self.raise_on_no_idempotency_key = False
    self.expires_after_seconds: int = 60 * 60  # 1 hour default
    self.use_local_cache = False
    self.hash_function = hashlib.md5

configure

configure(
    config: IdempotencyConfig,
    function_name: str | None = None,
    key_prefix: str | None = None,
) -> None

Initialize the base persistence layer from the configuration settings

PARAMETER DESCRIPTION
config

Idempotency configuration settings

TYPE: IdempotencyConfig

function_name

The name of the function being decorated

TYPE: str | None DEFAULT: None

key_prefix

Custom prefix for idempotency key: key_prefix#hash

TYPE: str | None DEFAULT: None

Source code in aws_lambda_powertools/utilities/idempotency/persistence/base.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def configure(
    self,
    config: IdempotencyConfig,
    function_name: str | None = None,
    key_prefix: str | None = None,
) -> None:
    """
    Initialize the base persistence layer from the configuration settings

    Parameters
    ----------
    config: IdempotencyConfig
        Idempotency configuration settings
    function_name: str, Optional
        The name of the function being decorated
    key_prefix: str | Optional
        Custom prefix for idempotency key: key_prefix#hash
    """
    self.function_name = (
        key_prefix or f"{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, 'test-func')}.{function_name or ''}"
    )

    if self.configured:
        # Prevent being reconfigured multiple times
        return
    self.configured = True

    self.event_key_jmespath = config.event_key_jmespath
    if config.event_key_jmespath:
        self.event_key_compiled_jmespath = jmespath.compile(config.event_key_jmespath)
    self.jmespath_options = config.jmespath_options or {"custom_functions": PowertoolsFunctions()}
    if config.payload_validation_jmespath:
        self.validation_key_jmespath = jmespath.compile(config.payload_validation_jmespath)
        self.payload_validation_enabled = True
    self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
    self.expires_after_seconds = config.expires_after_seconds
    self.use_local_cache = config.use_local_cache
    if self.use_local_cache:
        self._cache = LRUDict(max_items=config.local_cache_max_items)
    self.hash_function = getattr(hashlib, config.hash_function)

delete_record

delete_record(data: dict[str, Any], exception: Exception)

Delete record from the persistence store

PARAMETER DESCRIPTION
data

Payload

TYPE: dict[str, Any]

exception

The exception raised by the function

TYPE: Exception

Source code in aws_lambda_powertools/utilities/idempotency/persistence/base.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def delete_record(self, data: dict[str, Any], exception: Exception):
    """
    Delete record from the persistence store

    Parameters
    ----------
    data: dict[str, Any]
        Payload
    exception
        The exception raised by the function
    """

    idempotency_key = self._get_hashed_idempotency_key(data=data)
    if idempotency_key is None:
        # If the idempotency key is None, no data will be saved in the Persistence Layer.
        # See: https://github.com/aws-powertools/powertools-lambda-python/issues/2465
        return None

    data_record = DataRecord(idempotency_key=idempotency_key)

    logger.debug(
        f"Function raised an exception ({type(exception).__name__}). Clearing in progress record in persistence "
        f"store for idempotency key: {data_record.idempotency_key}",
    )
    self._delete_record(data_record=data_record)

    self._delete_from_cache(idempotency_key=data_record.idempotency_key)

get_record

get_record(data: dict[str, Any]) -> DataRecord | None

Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

PARAMETER DESCRIPTION
data

Payload

TYPE: dict[str, Any]

RETURNS DESCRIPTION
DataRecord

DataRecord representation of existing record found in persistence store

RAISES DESCRIPTION
IdempotencyItemNotFoundError

Exception raised if no record exists in persistence store with the idempotency key

IdempotencyValidationError

Payload doesn't match the stored record for the given idempotency key

Source code in aws_lambda_powertools/utilities/idempotency/persistence/base.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def get_record(self, data: dict[str, Any]) -> DataRecord | None:
    """
    Retrieve idempotency key for data provided, fetch from persistence store, and convert to DataRecord.

    Parameters
    ----------
    data: dict[str, Any]
        Payload

    Returns
    -------
    DataRecord
        DataRecord representation of existing record found in persistence store

    Raises
    ------
    IdempotencyItemNotFoundError
        Exception raised if no record exists in persistence store with the idempotency key
    IdempotencyValidationError
        Payload doesn't match the stored record for the given idempotency key
    """

    idempotency_key = self._get_hashed_idempotency_key(data=data)
    if idempotency_key is None:
        # If the idempotency key is None, no data will be saved in the Persistence Layer.
        # See: https://github.com/aws-powertools/powertools-lambda-python/issues/2465
        return None

    cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
    if cached_record:
        logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
        self._validate_payload(data_payload=data, stored_data_record=cached_record)
        return cached_record

    record = self._get_record(idempotency_key=idempotency_key)

    self._validate_payload(data_payload=data, stored_data_record=record)
    self._save_to_cache(data_record=record)

    return record

save_inprogress

save_inprogress(
    data: dict[str, Any],
    remaining_time_in_millis: int | None = None,
) -> None

Save record of function's execution being in progress

PARAMETER DESCRIPTION
data

Payload

TYPE: dict[str, Any]

remaining_time_in_millis

If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis

TYPE: int | None DEFAULT: None

Source code in aws_lambda_powertools/utilities/idempotency/persistence/base.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def save_inprogress(self, data: dict[str, Any], remaining_time_in_millis: int | None = None) -> None:
    """
    Save record of function's execution being in progress

    Parameters
    ----------
    data: dict[str, Any]
        Payload
    remaining_time_in_millis: int | None
        If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
    """

    idempotency_key = self._get_hashed_idempotency_key(data=data)
    if idempotency_key is None:
        # If the idempotency key is None, no data will be saved in the Persistence Layer.
        # See: https://github.com/aws-powertools/powertools-lambda-python/issues/2465
        return None

    data_record = DataRecord(
        idempotency_key=idempotency_key,
        status=STATUS_CONSTANTS["INPROGRESS"],
        expiry_timestamp=self._get_expiry_timestamp(),
        payload_hash=self._get_hashed_payload(data=data),
    )

    # When Lambda kills the container after timeout, the remaining_time_in_millis is 0, which is considered False.
    # Therefore, we need to check if remaining_time_in_millis is not None (>=0) to handle this case.
    # See: https://github.com/aws-powertools/powertools-lambda-python/issues/4759
    if remaining_time_in_millis is not None:
        now = datetime.datetime.now()
        period = datetime.timedelta(milliseconds=remaining_time_in_millis)
        timestamp = (now + period).timestamp()
        data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
    else:
        warnings.warn(
            "Couldn't determine the remaining time left. "
            "Did you call register_lambda_context on IdempotencyConfig?",
            stacklevel=2,
        )

    logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

    if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
        raise IdempotencyItemAlreadyExistsError

    self._put_record(data_record=data_record)

save_success

save_success(data: dict[str, Any], result: dict) -> None

Save record of function's execution completing successfully

PARAMETER DESCRIPTION
data

Payload

TYPE: dict[str, Any]

result

The response from function

TYPE: dict

Source code in aws_lambda_powertools/utilities/idempotency/persistence/base.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def save_success(self, data: dict[str, Any], result: dict) -> None:
    """
    Save record of function's execution completing successfully

    Parameters
    ----------
    data: dict[str, Any]
        Payload
    result: dict
        The response from function
    """
    idempotency_key = self._get_hashed_idempotency_key(data=data)
    if idempotency_key is None:
        # If the idempotency key is None, no data will be saved in the Persistence Layer.
        # See: https://github.com/aws-powertools/powertools-lambda-python/issues/2465
        return None

    response_data = json.dumps(result, cls=Encoder, sort_keys=True)

    data_record = DataRecord(
        idempotency_key=idempotency_key,
        status=STATUS_CONSTANTS["COMPLETED"],
        expiry_timestamp=self._get_expiry_timestamp(),
        response_data=response_data,
        payload_hash=self._get_hashed_payload(data=data),
    )
    logger.debug(
        f"Function successfully executed. Saving record to persistence store with "
        f"idempotency key: {data_record.idempotency_key}",
    )
    self._update_record(data_record=data_record)

    self._save_to_cache(data_record=data_record)

datarecord

Data Class for idempotency records.

CLASS DESCRIPTION
DataRecord

Data Class for idempotency records.

DataRecord

DataRecord(
    idempotency_key: str,
    status: str = "",
    expiry_timestamp: int | None = None,
    in_progress_expiry_timestamp: int | None = None,
    response_data: str = "",
    payload_hash: str = "",
)

Data Class for idempotency records.

METHOD DESCRIPTION
get_expiration_datetime

Converts the expiry timestamp to a datetime object.

response_json_as_dict

Get response data deserialized to python dict

ATTRIBUTE DESCRIPTION
is_expired

Check if data record is expired

TYPE: bool

status

Get status of data record

TYPE: str

Source code in aws_lambda_powertools/utilities/idempotency/persistence/datarecord.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    idempotency_key: str,
    status: str = "",
    expiry_timestamp: int | None = None,
    in_progress_expiry_timestamp: int | None = None,
    response_data: str = "",
    payload_hash: str = "",
) -> None:
    """

    Parameters
    ----------
    idempotency_key: str
        hashed representation of the idempotent data
    status: str, optional
        status of the idempotent record
    expiry_timestamp: int, optional
        time before the record should expire, in seconds
    in_progress_expiry_timestamp: int, optional
        time before the record should expire while in the INPROGRESS state, in seconds
    payload_hash: str, optional
        hashed representation of payload
    response_data: str, optional
        response data from previous executions using the record
    """
    self.idempotency_key = idempotency_key
    self.payload_hash = payload_hash
    self.expiry_timestamp = expiry_timestamp
    self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
    self._status = status
    self.response_data = response_data

is_expired property

is_expired: bool

Check if data record is expired

RETURNS DESCRIPTION
bool

Whether the record is currently expired or not

status property

status: str

Get status of data record

RETURNS DESCRIPTION
str

get_expiration_datetime

get_expiration_datetime() -> datetime.datetime | None

Converts the expiry timestamp to a datetime object.

This method checks if an expiry timestamp exists and converts it to a datetime object. If no timestamp is present, it returns None.

datetime.datetime | None A datetime object representing the expiration time, or None if no expiry timestamp is set.

The returned datetime object is timezone-naive and assumes the timestamp is in the system's local timezone. Lambda default timezone is UTC.

Source code in aws_lambda_powertools/utilities/idempotency/persistence/datarecord.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def get_expiration_datetime(self) -> datetime.datetime | None:
    """
    Converts the expiry timestamp to a datetime object.

    This method checks if an expiry timestamp exists and converts it to a
    datetime object. If no timestamp is present, it returns None.

    Returns:
    -------
    datetime.datetime | None
        A datetime object representing the expiration time, or None if no expiry timestamp is set.

    Note:
    ----
    The returned datetime object is timezone-naive and assumes the timestamp
    is in the system's local timezone. Lambda default timezone is UTC.
    """
    if self.expiry_timestamp:
        return datetime.datetime.fromtimestamp(int(self.expiry_timestamp))
    return None

response_json_as_dict

response_json_as_dict() -> dict | None

Get response data deserialized to python dict

RETURNS DESCRIPTION
dict | None

previous response data deserialized

Source code in aws_lambda_powertools/utilities/idempotency/persistence/datarecord.py
85
86
87
88
89
90
91
92
93
94
def response_json_as_dict(self) -> dict | None:
    """
    Get response data deserialized to python dict

    Returns
    -------
    dict | None
        previous response data deserialized
    """
    return json.loads(self.response_data) if self.response_data else None

dynamodb

CLASS DESCRIPTION
DynamoDBPersistenceLayer

DynamoDBPersistenceLayer

DynamoDBPersistenceLayer(
    table_name: str,
    key_attr: str = "id",
    static_pk_value: str | None = None,
    sort_key_attr: str | None = None,
    expiry_attr: str = "expiration",
    in_progress_expiry_attr: str = "in_progress_expiration",
    status_attr: str = "status",
    data_attr: str = "data",
    validation_key_attr: str = "validation",
    boto_config: Config | None = None,
    boto3_session: boto3.session.Session | None = None,
    boto3_client: DynamoDBClient | None = None,
)

Bases: BasePersistenceLayer

PARAMETER DESCRIPTION
table_name

Name of the table to use for storing execution records

TYPE: str

key_attr

DynamoDB attribute name for partition key, by default "id"

TYPE: str DEFAULT: 'id'

static_pk_value

DynamoDB attribute value for partition key, by default "idempotency#". This will be used if the sort_key_attr is set.

TYPE: str | None DEFAULT: None

sort_key_attr

DynamoDB attribute name for the sort key

TYPE: str | None DEFAULT: None

expiry_attr

DynamoDB attribute name for expiry timestamp, by default "expiration"

TYPE: str DEFAULT: 'expiration'

in_progress_expiry_attr

DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"

TYPE: str DEFAULT: 'in_progress_expiration'

status_attr

DynamoDB attribute name for status, by default "status"

TYPE: str DEFAULT: 'status'

data_attr

DynamoDB attribute name for response data, by default "data"

TYPE: str DEFAULT: 'data'

validation_key_attr

DynamoDB attribute name for hashed representation of the parts of the event used for validation

TYPE: str DEFAULT: 'validation'

boto_config

Botocore configuration to pass during client initialization

TYPE: Config | None DEFAULT: None

boto3_session

Boto3 session to use for AWS API communication

TYPE: Session DEFAULT: None

boto3_client

Boto3 DynamoDB Client to use, boto3_session and boto_config will be ignored if both are provided

TYPE: DynamoDBClient DEFAULT: None

Example

Create a DynamoDB persistence layer with custom settings

1
2
3
4
5
6
7
8
9
>>> from aws_lambda_powertools.utilities.idempotency import (
>>>    idempotent, DynamoDBPersistenceLayer
>>> )
>>>
>>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store")
>>>
>>> @idempotent(persistence_store=persistence_store)
>>> def handler(event, context):
>>>     return {"StatusCode": 200}
METHOD DESCRIPTION
boto3_supports_condition_check_failure

Check if the installed boto3 version supports condition check failure.

Source code in aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def __init__(
    self,
    table_name: str,
    key_attr: str = "id",
    static_pk_value: str | None = None,
    sort_key_attr: str | None = None,
    expiry_attr: str = "expiration",
    in_progress_expiry_attr: str = "in_progress_expiration",
    status_attr: str = "status",
    data_attr: str = "data",
    validation_key_attr: str = "validation",
    boto_config: Config | None = None,
    boto3_session: boto3.session.Session | None = None,
    boto3_client: DynamoDBClient | None = None,
):
    """
    Initialize the DynamoDB client

    Parameters
    ----------
    table_name: str
        Name of the table to use for storing execution records
    key_attr: str, optional
        DynamoDB attribute name for partition key, by default "id"
    static_pk_value: str, optional
        DynamoDB attribute value for partition key, by default "idempotency#<function-name>".
        This will be used if the sort_key_attr is set.
    sort_key_attr: str, optional
        DynamoDB attribute name for the sort key
    expiry_attr: str, optional
        DynamoDB attribute name for expiry timestamp, by default "expiration"
    in_progress_expiry_attr: str, optional
        DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
    status_attr: str, optional
        DynamoDB attribute name for status, by default "status"
    data_attr: str, optional
        DynamoDB attribute name for response data, by default "data"
    validation_key_attr: str, optional
        DynamoDB attribute name for hashed representation of the parts of the event used for validation
    boto_config: botocore.config.Config, optional
        Botocore configuration to pass during client initialization
    boto3_session : boto3.session.Session, optional
        Boto3 session to use for AWS API communication
    boto3_client : DynamoDBClient, optional
        Boto3 DynamoDB Client to use, boto3_session and boto_config will be ignored if both are provided

    Example
    --------
    **Create a DynamoDB persistence layer with custom settings**

        >>> from aws_lambda_powertools.utilities.idempotency import (
        >>>    idempotent, DynamoDBPersistenceLayer
        >>> )
        >>>
        >>> persistence_store = DynamoDBPersistenceLayer(table_name="idempotency_store")
        >>>
        >>> @idempotent(persistence_store=persistence_store)
        >>> def handler(event, context):
        >>>     return {"StatusCode": 200}
    """
    if boto3_client is None:
        boto3_session = boto3_session or boto3.session.Session()
        boto3_client = boto3_session.client("dynamodb", config=boto_config)
    self.client = boto3_client

    user_agent.register_feature_to_client(client=self.client, feature="idempotency")

    if sort_key_attr == key_attr:
        raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!")

    if static_pk_value is None:
        static_pk_value = f"idempotency#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}"

    self.table_name = table_name
    self.key_attr = key_attr
    self.static_pk_value = static_pk_value
    self.sort_key_attr = sort_key_attr
    self.expiry_attr = expiry_attr
    self.in_progress_expiry_attr = in_progress_expiry_attr
    self.status_attr = status_attr
    self.data_attr = data_attr
    self.validation_key_attr = validation_key_attr

    # Use DynamoDB's ReturnValuesOnConditionCheckFailure to optimize put and get operations and optimize costs.
    # This feature is supported in boto3 versions 1.26.164 and later.
    self.return_value_on_condition = (
        {"ReturnValuesOnConditionCheckFailure": "ALL_OLD"}
        if self.boto3_supports_condition_check_failure(boto3.__version__)
        else {}
    )

    self._deserializer = TypeDeserializer()

    super().__init__()

boto3_supports_condition_check_failure staticmethod

boto3_supports_condition_check_failure(
    boto3_version: str,
) -> bool

Check if the installed boto3 version supports condition check failure.

Params
RETURNS DESCRIPTION
bool

True if the boto3 version supports condition check failure, False otherwise.

Source code in aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
@staticmethod
def boto3_supports_condition_check_failure(boto3_version: str) -> bool:
    """
    Check if the installed boto3 version supports condition check failure.

    Params
    ------
    boto3_version: str
        The boto3 version

    Returns
    -------
    bool
        True if the boto3 version supports condition check failure, False otherwise.
    """
    # Only supported in boto3 1.26.164 and above
    major, minor, *patch = map(int, boto3_version.split("."))
    return (major, minor, *patch) >= (1, 26, 164)

redis

CLASS DESCRIPTION
RedisCachePersistenceLayer
RedisClientProtocol

Protocol class defining the interface for a Redis client.

RedisConnection

RedisCachePersistenceLayer

RedisCachePersistenceLayer(
    url: str = "",
    host: str = "",
    port: int = 6379,
    username: str = "",
    password: str = "",
    db_index: int = 0,
    mode: Literal["standalone", "cluster"] = "standalone",
    ssl: bool = True,
    client: RedisClientProtocol | None = None,
    in_progress_expiry_attr: str = "in_progress_expiration",
    expiry_attr: str = "expiration",
    status_attr: str = "status",
    data_attr: str = "data",
    validation_key_attr: str = "validation",
)

Bases: BasePersistenceLayer

PARAMETER DESCRIPTION
host

Redis host

TYPE: str DEFAULT: ''

port

Redis port

TYPE: int DEFAULT: 6379

username

Redis username

TYPE: str DEFAULT: ''

password

Redis password

TYPE: str DEFAULT: ''

url

Redis connection string, using url will override the host/port in the previous parameters

TYPE: str DEFAULT: ''

db_index

Redis db index

TYPE: int DEFAULT: 0

mode

set Redis client mode, choose from standalone/cluster

TYPE: Literal['standalone', 'cluster'] DEFAULT: 'standalone'

ssl

set whether to use ssl for Redis connection

TYPE: bool DEFAULT: True

client

Bring your own Redis client that follows RedisClientProtocol. If provided, all other connection configuration options will be ignored

TYPE: RedisClientProtocol | None DEFAULT: None

expiry_attr

Redis json attribute name for expiry timestamp, by default "expiration"

TYPE: str DEFAULT: 'expiration'

in_progress_expiry_attr

Redis json attribute name for in-progress expiry timestamp, by default "in_progress_expiration"

TYPE: str DEFAULT: 'in_progress_expiration'

status_attr

Redis json attribute name for status, by default "status"

TYPE: str DEFAULT: 'status'

data_attr

Redis json attribute name for response data, by default "data"

TYPE: str DEFAULT: 'data'

validation_key_attr

Redis json attribute name for hashed representation of the parts of the event used for validation

TYPE: str DEFAULT: 'validation'

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from redis import Redis
from aws_lambda_powertools.utilities.idempotency import (
    idempotent,
)

from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
    RedisCachePersistenceLayer,
)

client = redis.Redis(
    host="localhost",
    port="6379",
    decode_responses=True,
)
persistence_layer = RedisCachePersistenceLayer(client=client)

@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext):
    print("expensive operation")
    return {
        "payment_id": 12345,
        "message": "success",
        "statusCode": 200,
    }
Source code in aws_lambda_powertools/utilities/idempotency/persistence/redis.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def __init__(
    self,
    url: str = "",
    host: str = "",
    port: int = 6379,
    username: str = "",
    password: str = "",  # nosec - password for Redis connection
    db_index: int = 0,
    mode: Literal["standalone", "cluster"] = "standalone",
    ssl: bool = True,
    client: RedisClientProtocol | None = None,
    in_progress_expiry_attr: str = "in_progress_expiration",
    expiry_attr: str = "expiration",
    status_attr: str = "status",
    data_attr: str = "data",
    validation_key_attr: str = "validation",
):
    """
    Initialize the Redis Persistence Layer

    Parameters
    ----------
    host: str, optional
        Redis host
    port: int, optional: default 6379
        Redis port
    username: str, optional
        Redis username
    password: str, optional
        Redis password
    url: str, optional
        Redis connection string, using url will override the host/port in the previous parameters
    db_index: int, optional: default 0
        Redis db index
    mode: str, Literal["standalone","cluster"]
        set Redis client mode, choose from standalone/cluster
    ssl: bool, optional: default True
        set whether to use ssl for Redis connection
    client: RedisClientProtocol, optional
        Bring your own Redis client that follows RedisClientProtocol.
        If provided, all other connection configuration options will be ignored
    expiry_attr: str, optional
        Redis json attribute name for expiry timestamp, by default "expiration"
    in_progress_expiry_attr: str, optional
        Redis json attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
    status_attr: str, optional
        Redis json attribute name for status, by default "status"
    data_attr: str, optional
        Redis json attribute name for response data, by default "data"
    validation_key_attr: str, optional
        Redis json attribute name for hashed representation of the parts of the event used for validation

    Examples
    --------

    ```python
    from redis import Redis
    from aws_lambda_powertools.utilities.idempotency import (
        idempotent,
    )

    from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
        RedisCachePersistenceLayer,
    )

    client = redis.Redis(
        host="localhost",
        port="6379",
        decode_responses=True,
    )
    persistence_layer = RedisCachePersistenceLayer(client=client)

    @idempotent(persistence_store=persistence_layer)
    def lambda_handler(event: dict, context: LambdaContext):
        print("expensive operation")
        return {
            "payment_id": 12345,
            "message": "success",
            "statusCode": 200,
        }
    ```
    """

    # Initialize Redis client with Redis config if no client is passed in
    if client is None:
        self.client = RedisConnection(
            host=host,
            port=port,
            username=username,
            password=password,
            db_index=db_index,
            url=url,
            mode=mode,
            ssl=ssl,
        )._init_client()
    else:
        self.client = client

    self.in_progress_expiry_attr = in_progress_expiry_attr
    self.expiry_attr = expiry_attr
    self.status_attr = status_attr
    self.data_attr = data_attr
    self.validation_key_attr = validation_key_attr
    self._json_serializer = json.dumps
    self._json_deserializer = json.loads
    super().__init__()
    self._orphan_lock_timeout = min(10, self.expires_after_seconds)

RedisClientProtocol

Bases: Protocol

Protocol class defining the interface for a Redis client.

This protocol outlines the expected behavior of a Redis client, allowing for standardization among different implementations and allowing customers to extend it in their own implementation.

METHOD DESCRIPTION
- get

Retrieves the value associated with the given key.

- set
) -> bool | None:

Sets the value for the specified key with optional parameters.

- delete

Deletes one or more keys.

Note
  • The ex parameter represents the expiration time in seconds.
  • The px parameter represents the expiration time in milliseconds.
  • The nx parameter, if True, sets the value only if the key does not exist.
RAISES DESCRIPTION
- NotImplementedError: If any of the methods are not implemented by the concrete class.

RedisConnection

RedisConnection(
    url: str = "",
    host: str = "",
    port: int = 6379,
    username: str = "",
    password: str = "",
    db_index: int = 0,
    mode: Literal["standalone", "cluster"] = "standalone",
    ssl: bool = True,
)
PARAMETER DESCRIPTION
host

Redis host

TYPE: str DEFAULT: ''

port

Redis port

TYPE: int DEFAULT: 6379

username

Redis username

TYPE: str DEFAULT: ''

password

Redis password

TYPE: str DEFAULT: ''

url

Redis connection string, using url will override the host/port in the previous parameters

TYPE: str DEFAULT: ''

db_index

Redis db index

TYPE: int DEFAULT: 0

mode

set Redis client mode, choose from standalone/cluster. The default is standalone

TYPE: Literal['standalone', 'cluster'] DEFAULT: 'standalone'

ssl

set whether to use ssl for Redis connection

TYPE: bool DEFAULT: True

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from dataclasses import dataclass, field
from uuid import uuid4

from aws_lambda_powertools.utilities.idempotency import (
    idempotent,
)
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
    RedisCachePersistenceLayer,
)

from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = RedisCachePersistenceLayer(host="localhost", port=6379)


@dataclass
class Payment:
    user_id: str
    product_id: str
    payment_id: str = field(default_factory=lambda: f"{uuid4()}")


class PaymentError(Exception):
    ...


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext):
    try:
        payment: Payment = create_subscription_payment(event)
        return {
            "payment_id": payment.payment_id,
            "message": "success",
            "statusCode": 200,
        }
    except Exception as exc:
        raise PaymentError(f"Error creating payment {str(exc)}")


def create_subscription_payment(event: dict) -> Payment:
    return Payment(**event)
Source code in aws_lambda_powertools/utilities/idempotency/persistence/redis.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def __init__(
    self,
    url: str = "",
    host: str = "",
    port: int = 6379,
    username: str = "",
    password: str = "",  # nosec - password for Redis connection
    db_index: int = 0,
    mode: Literal["standalone", "cluster"] = "standalone",
    ssl: bool = True,
) -> None:
    """
    Initialize Redis connection which will be used in Redis persistence_store to support Idempotency

    Parameters
    ----------
    host: str, optional
        Redis host
    port: int, optional: default 6379
        Redis port
    username: str, optional
        Redis username
    password: str, optional
        Redis password
    url: str, optional
        Redis connection string, using url will override the host/port in the previous parameters
    db_index: int, optional: default 0
        Redis db index
    mode: str, Literal["standalone","cluster"]
        set Redis client mode, choose from standalone/cluster. The default is standalone
    ssl: bool, optional: default True
        set whether to use ssl for Redis connection

    Example
    --------

    ```python
    from dataclasses import dataclass, field
    from uuid import uuid4

    from aws_lambda_powertools.utilities.idempotency import (
        idempotent,
    )
    from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
        RedisCachePersistenceLayer,
    )

    from aws_lambda_powertools.utilities.typing import LambdaContext

    persistence_layer = RedisCachePersistenceLayer(host="localhost", port=6379)


    @dataclass
    class Payment:
        user_id: str
        product_id: str
        payment_id: str = field(default_factory=lambda: f"{uuid4()}")


    class PaymentError(Exception):
        ...


    @idempotent(persistence_store=persistence_layer)
    def lambda_handler(event: dict, context: LambdaContext):
        try:
            payment: Payment = create_subscription_payment(event)
            return {
                "payment_id": payment.payment_id,
                "message": "success",
                "statusCode": 200,
            }
        except Exception as exc:
            raise PaymentError(f"Error creating payment {str(exc)}")


    def create_subscription_payment(event: dict) -> Payment:
        return Payment(**event)

    ```
    """
    self.url = url
    self.host = host
    self.port = port
    self.username = username
    self.password = password
    self.db_index = db_index
    self.ssl = ssl
    self.mode = mode