Skip to content

Idempotency

The idempotency utility provides a simple solution to convert your Lambda functions into idempotent operations which are safe to retry.

Key features

  • Prevent Lambda handler from executing more than once on the same event payload during a time window
  • Ensure Lambda handler returns the same result when called with the same payload
  • Select a subset of the event as the idempotency key using JMESPath expressions
  • Set a time window in which records with the same payload should be considered duplicates
  • Expires in-progress executions if the Lambda function times out halfway through
  • Support Amazon DynamoDB and Redis as persistence layers

Terminology

The property of idempotency means that an operation does not cause additional side effects if it is called more than once with the same input parameters.

Idempotent operations will return the same result when they are called multiple times with the same parameters. This makes idempotent operations safe to retry.

Idempotency key is a hash representation of either the entire event or a specific configured subset of the event, and invocation results are JSON serialized and stored in your persistence storage layer.

Idempotency record is the data representation of an idempotent request saved in your preferred storage layer. We use it to coordinate whether a request is idempotent, whether it's still valid or expired based on timestamps, etc.

classDiagram
    direction LR
    class IdempotencyRecord {
        idempotency_key str
        status Status
        expiry_timestamp int
        in_progress_expiry_timestamp int
        response_data Json~str~
        payload_hash str
    }
    class Status {
        <<Enumeration>>
        INPROGRESS
        COMPLETE
        EXPIRED internal_only
    }
    IdempotencyRecord -- Status

Idempotency record representation

Getting started

Note

This section uses DynamoDB as the default idempotent persistence storage layer. If you are interested in using Redis as the persistence storage layer, check out the Redis as persistence storage layer Section.

IAM Permissions

Your Lambda function IAM Role must have dynamodb:GetItem, dynamodb:PutItem, dynamodb:UpdateItem and dynamodb:DeleteItem IAM permissions before using this feature.

Note

If you're using our example AWS Serverless Application Model (SAM), AWS Cloud Development Kit (CDK), or Terraform it already adds the required permissions.

Required resources

Before getting started, you need to create a persistent storage layer where the idempotency utility can store its state - your lambda functions will need read and write access to it.

We currently support Amazon DynamoDB and Redis as a storage layer. The following example demonstrates how to create a table in DynamoDB. If you prefer to use Redis, refer go to the section RedisPersistenceLayer section.

Default table configuration

If you're not changing the default configuration for the DynamoDB persistence layer, this is the expected default configuration:

Configuration Value Notes
Partition key id
TTL attribute name expiration This can only be configured after your table is created if you're using AWS Console
Tip: You can share a single state table for all functions

You can reuse the same DynamoDB table to store idempotency state. We add module_name and qualified name for classes and functions in addition to the idempotency key as a hash key.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Transform: AWS::Serverless-2016-10-31
Resources:
  IdempotencyTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: id
          AttributeType: S
      KeySchema:
        - AttributeName: id
          KeyType: HASH
      TimeToLiveSpecification:
        AttributeName: expiration
        Enabled: true
      BillingMode: PAY_PER_REQUEST

  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.12
      Handler: app.py
      Policies:
        - Statement:
          - Sid: AllowDynamodbReadWrite
            Effect: Allow
            Action:
              - dynamodb:PutItem
              - dynamodb:GetItem
              - dynamodb:UpdateItem
              - dynamodb:DeleteItem
            Resource: !GetAtt IdempotencyTable.Arn
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from aws_cdk import RemovalPolicy
from aws_cdk import aws_dynamodb as dynamodb
from aws_cdk import aws_iam as iam
from constructs import Construct


class IdempotencyConstruct(Construct):
    def __init__(self, scope: Construct, name: str, lambda_role: iam.Role) -> None:
        super().__init__(scope, name)
        self.idempotency_table = dynamodb.Table(
            self,
            "IdempotencyTable",
            partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            removal_policy=RemovalPolicy.DESTROY,
            time_to_live_attribute="expiration",
            point_in_time_recovery=True,
        )
        self.idempotency_table.grant(
            lambda_role,
            "dynamodb:PutItem",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:DeleteItem",
        )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 4.0"
    }
  }
}

provider "aws" {
  region = "us-east-1" # Replace with your desired AWS region
}

resource "aws_dynamodb_table" "IdempotencyTable" {
  name         = "IdempotencyTable"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "id"
  attribute {
    name = "id"
    type = "S"
  }
  ttl {
    attribute_name = "expiration"
    enabled        = true
  }
}

resource "aws_lambda_function" "IdempotencyFunction" {
  function_name = "IdempotencyFunction"
  role          = aws_iam_role.IdempotencyFunctionRole.arn
  runtime       = "python3.12"
  handler       = "app.lambda_handler"
  filename      = "lambda.zip"

}

resource "aws_iam_role" "IdempotencyFunctionRole" {
  name = "IdempotencyFunctionRole"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = ""
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
        Action = "sts:AssumeRole"
      },
    ]
  })
}

resource "aws_iam_policy" "LambdaDynamoDBPolicy" {
  name        = "LambdaDynamoDBPolicy"
  description = "IAM policy for Lambda function to access DynamoDB"
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "AllowDynamodbReadWrite"
        Effect = "Allow"
        Action = [
          "dynamodb:PutItem",
          "dynamodb:GetItem",
          "dynamodb:UpdateItem",
          "dynamodb:DeleteItem",
        ]
        Resource = aws_dynamodb_table.IdempotencyTable.arn
      },
    ]
  })
}

resource "aws_iam_role_policy_attachment" "IdempotencyFunctionRoleAttachment" {
  role       = aws_iam_role.IdempotencyFunctionRole.name
  policy_arn = aws_iam_policy.LambdaDynamoDBPolicy.arn
}
Warning: Large responses with DynamoDB persistence layer

When using this utility with DynamoDB, your function's responses must be smaller than 400KB.

Larger items cannot be written to DynamoDB and will cause exceptions. If your response exceeds 400kb, consider using Redis as your persistence layer.

Info: DynamoDB

During the first invocation with a payload, the Lambda function executes both a PutItem and an UpdateItem operations to store the data in DynamoDB. If the result returned by your Lambda is less than 1kb, you can expect 2 WCUs per Lambda invocation.

On subsequent invocations with the same payload, you can expect just 1 PutItem request to DynamoDB.

Note: While we try to minimize requests to DynamoDB to 1 per invocation, if your boto3 version is lower than 1.26.194, you may experience 2 requests in every invocation. Ensure to check your boto3 version and review the DynamoDB pricing documentation to estimate the cost.

Idempotent decorator

You can quickly start by initializing the DynamoDBPersistenceLayer class and using it with the idempotent decorator on your lambda handler.

Note

In this example, the entire Lambda handler is treated as a single idempotent operation. If your Lambda handler can cause multiple side effects, or you're only interested in making a specific logic idempotent, use idempotent_function instead.

See Choosing a payload subset for idempotency for more elaborate use cases.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from dataclasses import dataclass, field
from uuid import uuid4

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")


@dataclass
class Payment:
    user_id: str
    product_id: str
    payment_id: str = field(default_factory=lambda: f"{uuid4()}")


class PaymentError(Exception):
    ...


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext):
    try:
        payment: Payment = create_subscription_payment(event)
        return {
            "payment_id": payment.payment_id,
            "message": "success",
            "statusCode": 200,
        }
    except Exception as exc:
        raise PaymentError(f"Error creating payment {str(exc)}")


def create_subscription_payment(event: dict) -> Payment:
    return Payment(**event)
1
2
3
4
{
  "user_id": "xyz",
  "product_id": "123456789"
}

After processing this request successfully, a second request containing the exact same payload above will now return the same response, ensuring our customer isn't charged twice.

New to idempotency concept? Please review our Terminology section if you haven't yet.

Idempotent_function decorator

Similar to idempotent decorator, you can use idempotent_function decorator for any synchronous Python function.

When using idempotent_function, you must tell us which keyword parameter in your function signature has the data we should use via data_keyword_argument.

We support JSON serializable data, Python Dataclasses, Parser/Pydantic Models, and our Event Source Data Classes.

Limitation

Make sure to call your decorated function using keyword arguments.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from dataclasses import dataclass

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(event_key_jmespath="order_id")  # see Choosing a payload subset section


@dataclass
class OrderItem:
    sku: str
    description: str


@dataclass
class Order:
    item: OrderItem
    order_id: int


@idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb)
def process_order(order: Order):
    return f"processed order {order.order_id}"


def lambda_handler(event: dict, context: LambdaContext):
    config.register_lambda_context(context)  # see Lambda timeouts section
    order_item = OrderItem(sku="fake", description="sample")
    order = Order(item=order_item, order_id=1)

    # `order` parameter must be called as a keyword argument to work
    process_order(order=order)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.parser import BaseModel
from aws_lambda_powertools.utilities.typing import LambdaContext

dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(event_key_jmespath="order_id")  # see Choosing a payload subset section


class OrderItem(BaseModel):
    sku: str
    description: str


class Order(BaseModel):
    item: OrderItem
    order_id: int


@idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb)
def process_order(order: Order):
    return f"processed order {order.order_id}"


def lambda_handler(event: dict, context: LambdaContext):
    config.register_lambda_context(context)  # see Lambda timeouts section
    order_item = OrderItem(sku="fake", description="sample")
    order = Order(item=order_item, order_id=1)

    # `order` parameter must be called as a keyword argument to work
    process_order(order=order)

Output serialization

By default, idempotent_function serializes, stores, and returns your annotated function's result as a JSON object. You can change this behavior using output_serializer parameter.

The output serializer supports any JSON serializable data, Python Dataclasses and Pydantic Models.

When using the output_serializer parameter, the data will continue to be stored in DynamoDB as a JSON object.

You can use PydanticSerializer to automatically serialize what's retrieved from the persistent storage based on the return type annotated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.idempotency.serialization.pydantic import PydanticSerializer
from aws_lambda_powertools.utilities.parser import BaseModel
from aws_lambda_powertools.utilities.typing import LambdaContext

dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(event_key_jmespath="order_id")  # see Choosing a payload subset section


class OrderItem(BaseModel):
    sku: str
    description: str


class Order(BaseModel):
    item: OrderItem
    order_id: int


class OrderOutput(BaseModel):
    order_id: int


@idempotent_function(
    data_keyword_argument="order",
    config=config,
    persistence_store=dynamodb,
    output_serializer=PydanticSerializer,
)
# order output is inferred from return type
def process_order(order: Order) -> OrderOutput:  # (1)!
    return OrderOutput(order_id=order.order_id)


def lambda_handler(event: dict, context: LambdaContext):
    config.register_lambda_context(context)  # see Lambda timeouts section
    order_item = OrderItem(sku="fake", description="sample")
    order = Order(item=order_item, order_id=1)

    # `order` parameter must be called as a keyword argument to work
    process_order(order=order)
  1. We'll use OrderOutput to instantiate a new object using the data retrieved from persistent storage as input.

    This ensures the return of the function is not impacted when @idempotent_function is used.

Alternatively, you can provide an explicit model as an input to PydanticSerializer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.idempotency.serialization.pydantic import PydanticSerializer
from aws_lambda_powertools.utilities.parser import BaseModel
from aws_lambda_powertools.utilities.typing import LambdaContext

dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(event_key_jmespath="order_id")  # see Choosing a payload subset section


class OrderItem(BaseModel):
    sku: str
    description: str


class Order(BaseModel):
    item: OrderItem
    order_id: int


class OrderOutput(BaseModel):
    order_id: int


@idempotent_function(
    data_keyword_argument="order",
    config=config,
    persistence_store=dynamodb,
    output_serializer=PydanticSerializer(model=OrderOutput),
)
def process_order(order: Order):
    return OrderOutput(order_id=order.order_id)


def lambda_handler(event: dict, context: LambdaContext):
    config.register_lambda_context(context)  # see Lambda timeouts section
    order_item = OrderItem(sku="fake", description="sample")
    order = Order(item=order_item, order_id=1)

    # `order` parameter must be called as a keyword argument to work
    process_order(order=order)

You can use DataclassSerializer to automatically serialize what's retrieved from the persistent storage based on the return type annotated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from dataclasses import dataclass

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.idempotency.serialization.dataclass import DataclassSerializer
from aws_lambda_powertools.utilities.typing import LambdaContext

dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(event_key_jmespath="order_id")  # see Choosing a payload subset section


@dataclass
class OrderItem:
    sku: str
    description: str


@dataclass
class Order:
    item: OrderItem
    order_id: int


@dataclass
class OrderOutput:
    order_id: int


@idempotent_function(
    data_keyword_argument="order",
    config=config,
    persistence_store=dynamodb,
    output_serializer=DataclassSerializer,
)
# order output is inferred from return type
def process_order(order: Order) -> OrderOutput:  # (1)!
    return OrderOutput(order_id=order.order_id)


def lambda_handler(event: dict, context: LambdaContext):
    config.register_lambda_context(context)  # see Lambda timeouts section
    order_item = OrderItem(sku="fake", description="sample")
    order = Order(item=order_item, order_id=1)

    # `order` parameter must be called as a keyword argument to work
    process_order(order=order)
  1. We'll use OrderOutput to instantiate a new object using the data retrieved from persistent storage as input.

    This ensures the return of the function is not impacted when @idempotent_function is used.

Alternatively, you can provide an explicit model as an input to DataclassSerializer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from dataclasses import dataclass

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.idempotency.serialization.dataclass import DataclassSerializer
from aws_lambda_powertools.utilities.typing import LambdaContext

dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(event_key_jmespath="order_id")  # see Choosing a payload subset section


@dataclass
class OrderItem:
    sku: str
    description: str


@dataclass
class Order:
    item: OrderItem
    order_id: int


@dataclass
class OrderOutput:
    order_id: int


@idempotent_function(
    data_keyword_argument="order",
    config=config,
    persistence_store=dynamodb,
    output_serializer=DataclassSerializer(model=OrderOutput),
)
def process_order(order: Order):
    return OrderOutput(order_id=order.order_id)


def lambda_handler(event: dict, context: LambdaContext):
    config.register_lambda_context(context)  # see Lambda timeouts section
    order_item = OrderItem(sku="fake", description="sample")
    order = Order(item=order_item, order_id=1)

    # `order` parameter must be called as a keyword argument to work
    process_order(order=order)

You can use CustomDictSerializer to have full control over the serialization process for any type. It expects two functions:

  • to_dict. Function to convert any type to a JSON serializable dictionary before it saves into the persistent storage.
  • from_dict. Function to convert from a dictionary retrieved from persistent storage and serialize in its original form.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from typing import Dict, Type

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.idempotency.serialization.custom_dict import CustomDictSerializer
from aws_lambda_powertools.utilities.typing import LambdaContext

dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(event_key_jmespath="order_id")  # see Choosing a payload subset section


class OrderItem:
    def __init__(self, sku: str, description: str):
        self.sku = sku
        self.description = description


class Order:
    def __init__(self, item: OrderItem, order_id: int):
        self.item = item
        self.order_id = order_id


class OrderOutput:
    def __init__(self, order_id: int):
        self.order_id = order_id


def order_to_dict(x: Type[OrderOutput]) -> Dict:  # (1)!
    return dict(x.__dict__)


def dict_to_order(x: Dict) -> OrderOutput:  # (2)!
    return OrderOutput(**x)


order_output_serializer = CustomDictSerializer(  # (3)!
    to_dict=order_to_dict,
    from_dict=dict_to_order,
)


@idempotent_function(
    data_keyword_argument="order",
    config=config,
    persistence_store=dynamodb,
    output_serializer=order_output_serializer,
)
def process_order(order: Order) -> OrderOutput:
    return OrderOutput(order_id=order.order_id)


def lambda_handler(event: dict, context: LambdaContext):
    config.register_lambda_context(context)  # see Lambda timeouts section
    order_item = OrderItem(sku="fake", description="sample")
    order = Order(item=order_item, order_id=1)

    # `order` parameter must be called as a keyword argument to work
    process_order(order=order)
  1. This function does the following

    1. Receives the return from process_order
    2. Converts to dictionary before it can be saved into the persistent storage.
  2. This function does the following

    1. Receives the dictionary saved into the persistent storage
    1 Serializes to OrderOutput before @idempotent returns back to the caller.
  3. This serializer receives both functions so it knows who to call when to serialize to and from dictionary.

Batch integration

You can can easily integrate with Batch utility via context manager. This ensures that you process each record in an idempotent manner, and guard against a Lambda timeout idempotent situation.

Choosing an unique batch record attribute

In this example, we choose messageId as our idempotency key since we know it'll be unique.

Depending on your use case, it might be more accurate to choose another field your producer intentionally set to define uniqueness.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()
processor = BatchProcessor(event_type=EventType.SQS)

dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(
    event_key_jmespath="messageId",  # see Choosing a payload subset section
)


@idempotent_function(data_keyword_argument="record", config=config, persistence_store=dynamodb)
def record_handler(record: SQSRecord):
    return {"message": record.body}


def lambda_handler(event: SQSRecord, context: LambdaContext):
    config.register_lambda_context(context)  # see Lambda timeouts section

    # with Lambda context registered for Idempotency
    # we can now kick in the Bach processing logic
    batch = event["Records"]
    with processor(records=batch, handler=record_handler):
        # in case you want to access each record processed by your record_handler
        # otherwise ignore the result variable assignment
        processed_messages = processor.process()
        logger.info(processed_messages)

    return processor.response()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
  "Records": [
    {
      "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
      "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
      "body": "Test message.",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1545082649183",
        "SenderId": "replace-to-pass-gitleak",
        "ApproximateFirstReceiveTimestamp": "1545082649185"
      },
      "messageAttributes": {
        "testAttr": {
          "stringValue": "100",
          "binaryValue": "base64Str",
          "dataType": "Number"
        }
      },
      "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
      "awsRegion": "us-east-2"
    }
  ]
}

Choosing a payload subset for idempotency

Tip: Dealing with always changing payloads

When dealing with a more elaborate payload, where parts of the payload always change, you should use event_key_jmespath parameter.

Use IdempotencyConfig to instruct the idempotent decorator to only use a portion of your payload to verify whether a request is idempotent, and therefore it should not be retried.

Payment scenario

In this example, we have a Lambda handler that creates a payment for a user subscribing to a product. We want to ensure that we don't accidentally charge our customer by subscribing them more than once.

Imagine the function executes successfully, but the client never receives the response due to a connection issue. It is safe to retry in this instance, as the idempotent decorator will return a previously saved response.

What we want here is to instruct Idempotency to use user_id and product_id fields from our incoming payload as our idempotency key. If we were to treat the entire request as our idempotency key, a simple HTTP header change would cause our customer to be charged twice.

Deserializing JSON strings in payloads for increased accuracy.

The payload extracted by the event_key_jmespath is treated as a string by default. This means there could be differences in whitespace even when the JSON payload itself is identical.

To alter this behaviour, we can use the JMESPath built-in function powertools_json() to treat the payload as a JSON object (dict) rather than a string.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import json
from dataclasses import dataclass, field
from uuid import uuid4

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")

# Deserialize JSON string under the "body" key
# then extract "user" and "product_id" data
config = IdempotencyConfig(event_key_jmespath='powertools_json(body).["user_id", "product_id"]')


@dataclass
class Payment:
    user_id: str
    product_id: str
    payment_id: str = field(default_factory=lambda: f"{uuid4()}")


class PaymentError(Exception):
    ...


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext):
    try:
        payment_info: str = event.get("body", "")
        payment: Payment = create_subscription_payment(json.loads(payment_info))
        return {
            "payment_id": payment.payment_id,
            "message": "success",
            "statusCode": 200,
        }
    except Exception as exc:
        raise PaymentError(f"Error creating payment {str(exc)}")


def create_subscription_payment(event: dict) -> Payment:
    return Payment(**event)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
  "version": "2.0",
  "routeKey": "ANY /createpayment",
  "rawPath": "/createpayment",
  "rawQueryString": "",
  "headers": {
    "Header1": "value1",
    "Header2": "value2"
  },
  "requestContext": {
    "accountId": "123456789012",
    "apiId": "api-id",
    "domainName": "id.execute-api.us-east-1.amazonaws.com",
    "domainPrefix": "id",
    "http": {
      "method": "POST",
      "path": "/createpayment",
      "protocol": "HTTP/1.1",
      "sourceIp": "ip",
      "userAgent": "agent"
    },
    "requestId": "id",
    "routeKey": "ANY /createpayment",
    "stage": "$default",
    "time": "10/Feb/2021:13:40:43 +0000",
    "timeEpoch": 1612964443723
  },
  "body": "{\"user_id\":\"xyz\",\"product_id\":\"123456789\"}",
  "isBase64Encoded": false
}

Lambda timeouts

Note

This is automatically done when you decorate your Lambda handler with @idempotent decorator.

To prevent against extended failed retries when a Lambda function times out, Powertools for AWS Lambda (Python) calculates and includes the remaining invocation available time as part of the idempotency record.

Example

If a second invocation happens after this timestamp, and the record is marked as INPROGRESS, we will execute the invocation again as if it was in the EXPIRED state (e.g, expire_seconds field elapsed).

This means that if an invocation expired during execution, it will be quickly executed again on the next retry.

Important

If you are only using the @idempotent_function decorator to guard isolated parts of your code, you must use register_lambda_context available in the idempotency config object to benefit from this protection.

Here is an example on how you register the Lambda context in your handler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")

config = IdempotencyConfig()


@idempotent_function(data_keyword_argument="record", persistence_store=persistence_layer, config=config)
def record_handler(record: SQSRecord):
    return {"message": record["body"]}


def lambda_handler(event: dict, context: LambdaContext):
    config.register_lambda_context(context)

    return record_handler(event)

Handling exceptions

If you are using the idempotent decorator on your Lambda handler, any unhandled exceptions that are raised during the code execution will cause the record in the persistence layer to be deleted. This means that new invocations will execute your code again despite having the same payload. If you don't want the record to be deleted, you need to catch exceptions within the idempotent function and return a successful response.

sequenceDiagram
    participant Client
    participant Lambda
    participant Persistence Layer
    Client->>Lambda: Invoke (event)
    Lambda->>Persistence Layer: Get or set (id=event.search(payload))
    activate Persistence Layer
    Note right of Persistence Layer: Locked during this time. Prevents multiple<br/>Lambda invocations with the same<br/>payload running concurrently.
    Lambda--xLambda: Call handler (event).<br/>Raises exception
    Lambda->>Persistence Layer: Delete record (id=event.search(payload))
    deactivate Persistence Layer
    Lambda-->>Client: Return error response
Idempotent sequence exception

If you are using idempotent_function, any unhandled exceptions that are raised inside the decorated function will cause the record in the persistence layer to be deleted, and allow the function to be executed again if retried.

If an Exception is raised outside the scope of the decorated function and after your function has been called, the persistent record will not be affected. In this case, idempotency will be maintained for your decorated function. Example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import requests

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")

config = IdempotencyConfig()


def lambda_handler(event: dict, context: LambdaContext):
    # If an exception is raised here, no idempotent record will ever get created as the
    # idempotent function does not get called
    try:
        endpoint = "https://jsonplaceholder.typicode.com/comments/"  # change this endpoint to force an exception
        requests.get(endpoint)
    except Exception as exc:
        return str(exc)

    call_external_service(data={"user": "user1", "id": 5})

    # This exception will not cause the idempotent record to be deleted, since it
    # happens after the decorated function has been successfully called
    raise Exception


@idempotent_function(data_keyword_argument="data", config=config, persistence_store=persistence_layer)
def call_external_service(data: dict):
    result: requests.Response = requests.post(
        "https://jsonplaceholder.typicode.com/comments/",
        json={"user": data["user"], "transaction_id": data["id"]},
    )
    return result.json()
Warning

We will raise IdempotencyPersistenceLayerError if any of the calls to the persistence layer fail unexpectedly.

As this happens outside the scope of your decorated function, you are not able to catch it if you're using the idempotent decorator on your Lambda handler.

Persistence layers

DynamoDBPersistenceLayer

This persistence layer is built-in, allowing you to use an existing DynamoDB table or create a new one dedicated to idempotency state (recommended).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(
    table_name="IdempotencyTable",
    key_attr="idempotency_key",
    expiry_attr="expires_at",
    in_progress_expiry_attr="in_progress_expires_at",
    status_attr="current_status",
    data_attr="result_data",
    validation_key_attr="validation_key",
)


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return event

When using DynamoDB as the persistence layer, you can customize the attribute names by passing the following parameters during the initialization of the persistence layer:

Parameter Required Default Description
table_name ✔ Table name to store state
key_attr id Partition key of the table. Hashed representation of the payload (unless sort_key_attr is specified)
expiry_attr expiration Unix timestamp of when record expires
in_progress_expiry_attr in_progress_expiration Unix timestamp of when record expires while in progress (in case of the invocation times out)
status_attr status Stores status of the lambda execution during and after invocation
data_attr data Stores results of successfully executed Lambda handlers
validation_key_attr validation Hashed representation of the parts of the event used for validation
sort_key_attr Sort key of the table (if table is configured with a sort key).
static_pk_value idempotency#{LAMBDA_FUNCTION_NAME} Static value to use as the partition key. Only used when sort_key_attr is set.

RedisPersistenceLayer

This persistence layer is built-in, allowing you to use an existing Redis service. For optimal performance and compatibility, it is strongly recommended to use a Redis service version 7 or higher.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from aws_lambda_powertools.utilities.idempotency import (
    idempotent,
)
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
    RedisCachePersistenceLayer,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = RedisCachePersistenceLayer(
    host="localhost",
    port=6379,
    in_progress_expiry_attr="in_progress_expiration",
    status_attr="status",
    data_attr="data",
    validation_key_attr="validation",
)


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return event

When using Redis as the persistence layer, you can customize the attribute names by providing the following parameters upon initialization of the persistence layer:

Parameter Required Default Description
in_progress_expiry_attr in_progress_expiration Unix timestamp of when record expires while in progress (in case of the invocation times out)
status_attr status Stores status of the Lambda execution during and after invocation
data_attr data Stores results of successfully executed Lambda handlers
validation_key_attr validation Hashed representation of the parts of the event used for validation

Idempotency request flow

The following sequence diagrams explain how the Idempotency feature behaves under different scenarios.

Successful request

sequenceDiagram
    participant Client
    participant Lambda
    participant Persistence Layer
    alt initial request
        Client->>Lambda: Invoke (event)
        Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
        activate Persistence Layer
        Note over Lambda,Persistence Layer: Set record status to INPROGRESS. <br> Prevents concurrent invocations <br> with the same payload
        Lambda-->>Lambda: Call your function
        Lambda->>Persistence Layer: Update record with result
        deactivate Persistence Layer
        Persistence Layer-->>Persistence Layer: Update record
        Note over Lambda,Persistence Layer: Set record status to COMPLETE. <br> New invocations with the same payload <br> now return the same result
        Lambda-->>Client: Response sent to client
    else retried request
        Client->>Lambda: Invoke (event)
        Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
        activate Persistence Layer
        Persistence Layer-->>Lambda: Already exists in persistence layer.
        deactivate Persistence Layer
        Note over Lambda,Persistence Layer: Record status is COMPLETE and not expired
        Lambda-->>Client: Same response sent to client
    end
Idempotent successful request

Successful request with cache enabled

sequenceDiagram
    participant Client
    participant Lambda
    participant Persistence Layer
    alt initial request
      Client->>Lambda: Invoke (event)
      Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
      activate Persistence Layer
      Note over Lambda,Persistence Layer: Set record status to INPROGRESS. <br> Prevents concurrent invocations <br> with the same payload
      Lambda-->>Lambda: Call your function
      Lambda->>Persistence Layer: Update record with result
      deactivate Persistence Layer
      Persistence Layer-->>Persistence Layer: Update record
      Note over Lambda,Persistence Layer: Set record status to COMPLETE. <br> New invocations with the same payload <br> now return the same result
      Lambda-->>Lambda: Save record and result in memory
      Lambda-->>Client: Response sent to client
    else retried request
      Client->>Lambda: Invoke (event)
      Lambda-->>Lambda: Get idempotency_key=hash(payload)
      Note over Lambda,Persistence Layer: Record status is COMPLETE and not expired
      Lambda-->>Client: Same response sent to client
    end
Idempotent successful request cached

Successful request with response_hook configured

sequenceDiagram
    participant Client
    participant Lambda
    participant Response hook
    participant Persistence Layer
    alt initial request
        Client->>Lambda: Invoke (event)
        Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
        activate Persistence Layer
        Note over Lambda,Persistence Layer: Set record status to INPROGRESS. <br> Prevents concurrent invocations <br> with the same payload
        Lambda-->>Lambda: Call your function
        Lambda->>Persistence Layer: Update record with result
        deactivate Persistence Layer
        Persistence Layer-->>Persistence Layer: Update record
        Note over Lambda,Persistence Layer: Set record status to COMPLETE. <br> New invocations with the same payload <br> now return the same result
        Lambda-->>Client: Response sent to client
    else retried request
        Client->>Lambda: Invoke (event)
        Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
        activate Persistence Layer
        Persistence Layer-->>Response hook: Already exists in persistence layer.
        deactivate Persistence Layer
        Note over Response hook,Persistence Layer: Record status is COMPLETE and not expired
        Response hook->>Lambda: Response hook invoked
        Lambda-->>Client: Manipulated idempotent response sent to client
    end
Successful idempotent request with a response hook

Expired idempotency records

sequenceDiagram
    participant Client
    participant Lambda
    participant Persistence Layer
    alt initial request
        Client->>Lambda: Invoke (event)
        Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
        activate Persistence Layer
        Note over Lambda,Persistence Layer: Set record status to INPROGRESS. <br> Prevents concurrent invocations <br> with the same payload
        Lambda-->>Lambda: Call your function
        Lambda->>Persistence Layer: Update record with result
        deactivate Persistence Layer
        Persistence Layer-->>Persistence Layer: Update record
        Note over Lambda,Persistence Layer: Set record status to COMPLETE. <br> New invocations with the same payload <br> now return the same result
        Lambda-->>Client: Response sent to client
    else retried request
        Client->>Lambda: Invoke (event)
        Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
        activate Persistence Layer
        Persistence Layer-->>Lambda: Already exists in persistence layer.
        deactivate Persistence Layer
        Note over Lambda,Persistence Layer: Record status is COMPLETE but expired hours ago
        loop Repeat initial request process
            Note over Lambda,Persistence Layer: 1. Set record to INPROGRESS, <br> 2. Call your function, <br> 3. Set record to COMPLETE
        end
        Lambda-->>Client: Same response sent to client
    end
Previous Idempotent request expired

Concurrent identical in-flight requests

sequenceDiagram
    participant Client
    participant Lambda
    participant Persistence Layer
    Client->>Lambda: Invoke (event)
    Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
    activate Persistence Layer
    Note over Lambda,Persistence Layer: Set record status to INPROGRESS. <br> Prevents concurrent invocations <br> with the same payload
      par Second request
          Client->>Lambda: Invoke (event)
          Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
          Lambda--xLambda: IdempotencyAlreadyInProgressError
          Lambda->>Client: Error sent to client if unhandled
      end
    Lambda-->>Lambda: Call your function
    Lambda->>Persistence Layer: Update record with result
    deactivate Persistence Layer
    Persistence Layer-->>Persistence Layer: Update record
    Note over Lambda,Persistence Layer: Set record status to COMPLETE. <br> New invocations with the same payload <br> now return the same result
    Lambda-->>Client: Response sent to client
Concurrent identical in-flight requests

Lambda request timeout

sequenceDiagram
    participant Client
    participant Lambda
    participant Persistence Layer
    alt initial request
        Client->>Lambda: Invoke (event)
        Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
        activate Persistence Layer
        Note over Lambda,Persistence Layer: Set record status to INPROGRESS. <br> Prevents concurrent invocations <br> with the same payload
        Lambda-->>Lambda: Call your function
        Note right of Lambda: Time out
        Lambda--xLambda: Time out error
        Lambda-->>Client: Return error response
        deactivate Persistence Layer
    else retry after Lambda timeout elapses
        Client->>Lambda: Invoke (event)
        Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
        activate Persistence Layer
        Note over Lambda,Persistence Layer: Set record status to INPROGRESS. <br> Reset in_progress_expiry attribute
        Lambda-->>Lambda: Call your function
        Lambda->>Persistence Layer: Update record with result
        deactivate Persistence Layer
        Persistence Layer-->>Persistence Layer: Update record
        Lambda-->>Client: Response sent to client
    end
Idempotent request during and after Lambda timeouts

Optional idempotency key

sequenceDiagram
    participant Client
    participant Lambda
    participant Persistence Layer
    alt request with idempotency key
        Client->>Lambda: Invoke (event)
        Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
        activate Persistence Layer
        Note over Lambda,Persistence Layer: Set record status to INPROGRESS. <br> Prevents concurrent invocations <br> with the same payload
        Lambda-->>Lambda: Call your function
        Lambda->>Persistence Layer: Update record with result
        deactivate Persistence Layer
        Persistence Layer-->>Persistence Layer: Update record
        Note over Lambda,Persistence Layer: Set record status to COMPLETE. <br> New invocations with the same payload <br> now return the same result
        Lambda-->>Client: Response sent to client
    else request(s) without idempotency key
        Client->>Lambda: Invoke (event)
        Note over Lambda: Idempotency key is missing
        Note over Persistence Layer: Skips any operation to fetch, update, and delete
        Lambda-->>Lambda: Call your function
        Lambda-->>Client: Response sent to client
    end
Optional idempotency key

Race condition with Redis

graph TD;
    A(Existing orphan record in redis)-->A1;
    A1[Two Lambda invoke at same time]-->B1[Lambda handler1];
    B1-->B2[Fetch from Redis];
    B2-->B3[Handler1 got orphan record];
    B3-->B4[Handler1 acquired lock];
    B4-->B5[Handler1 overwrite orphan record]
    B5-->B6[Handler1 continue to execution];
    A1-->C1[Lambda handler2];
    C1-->C2[Fetch from Redis];
    C2-->C3[Handler2 got orphan record];
    C3-->C4[Handler2 failed to acquire lock];
    C4-->C5[Handler2 wait and fetch from Redis];
    C5-->C6[Handler2 return without executing];
    B6-->D(Lambda handler executed only once);
    C6-->D;
Race condition with Redis

Redis as persistent storage layer provider

Redis resources

Before setting up Redis as the persistent storage layer provider, you must have an existing Redis service. We recommend you to use Redis compatible services such as Amazon ElastiCache for Redis or Amazon MemoryDB for Redis as your persistent storage layer provider.

No existing Redis service?

If you don't have an existing Redis service, we recommend using DynamoDB as the persistent storage layer provider.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
AWSTemplateFormatVersion: '2010-09-09'

Resources:
  RedisServerlessIdempotency:
    Type: AWS::ElastiCache::ServerlessCache
    Properties:
      Engine: redis
      ServerlessCacheName: redis-cache
      SecurityGroupIds: # (1)!
          - security-{your_sg_id}
      SubnetIds:
          - subnet-{your_subnet_id_1}
          - subnet-{your_subnet_id_2}
  1. Replace the Security Group ID and Subnet ID to match your VPC settings.

VPC Access

Your Lambda Function must have network access to the Redis endpoint before using it as the idempotency persistent storage layer. In most cases, you will need to configure VPC access for your Lambda Function.

Amazon ElastiCache/MemoryDB for Redis as persistent storage layer provider

If you plan to use Amazon ElastiCache for Redis as the idempotency persistent storage layer, you may find this AWS tutorial helpful. For those using Amazon MemoryDB for Redis, refer to this AWS tutorial specifically for the VPC setup guidance.

After completing the VPC setup, you can use the templates provided below to set up Lambda functions with access to VPC internal subnets.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.11
      Handler: app.py
      VpcConfig: # (1)!
        SecurityGroupIds:
          - security-{your_sg_id}
        SubnetIds:
          - subnet-{your_subnet_id_1}
          - subnet-{your_subnet_id_2}
  1. Replace the Security Group ID and Subnet ID to match your VPC settings.

Configuring Redis persistence layer

You can quickly get started by initializing the RedisCachePersistenceLayer class and applying the idempotent decorator to your Lambda handler. For a detailed example of using the RedisCachePersistenceLayer, refer to the Persistence layers section.

Info

We enforce security best practices by using SSL connections in the RedisCachePersistenceLayer; to disable it, set ssl=False

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from dataclasses import dataclass, field
from uuid import uuid4

from aws_lambda_powertools.utilities.idempotency import (
    idempotent,
)
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
    RedisCachePersistenceLayer,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = RedisCachePersistenceLayer(host="localhost", port=6379)


@dataclass
class Payment:
    user_id: str
    product_id: str
    payment_id: str = field(default_factory=lambda: f"{uuid4()}")


class PaymentError(Exception):
    ...


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext):
    try:
        payment: Payment = create_subscription_payment(event)
        return {
            "payment_id": payment.payment_id,
            "message": "success",
            "statusCode": 200,
        }
    except Exception as exc:
        raise PaymentError(f"Error creating payment {str(exc)}")


def create_subscription_payment(event: dict) -> Payment:
    return Payment(**event)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from dataclasses import dataclass, field
from uuid import uuid4

from redis import Redis

from aws_lambda_powertools.utilities.idempotency import (
    idempotent,
)
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
    RedisCachePersistenceLayer,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

client = Redis(
    host="localhost",
    port=6379,
    socket_connect_timeout=5,
    socket_timeout=5,
    max_connections=1000,
)

persistence_layer = RedisCachePersistenceLayer(client=client)


@dataclass
class Payment:
    user_id: str
    product_id: str
    payment_id: str = field(default_factory=lambda: f"{uuid4()}")


class PaymentError(Exception):
    ...


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext):
    try:
        payment: Payment = create_subscription_payment(event)
        return {
            "payment_id": payment.payment_id,
            "message": "success",
            "statusCode": 200,
        }
    except Exception as exc:
        raise PaymentError(f"Error creating payment {str(exc)}")


def create_subscription_payment(event: dict) -> Payment:
    return Payment(**event)
1
2
3
4
{
  "user_id": "xyz",
  "product_id": "123456789"
}

Custom advanced settings

For advanced configurations, such as setting up SSL certificates or customizing parameters like a custom timeout, you can utilize the Redis client to tailor these specific settings to your needs.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from typing import Any

from redis import Redis

from aws_lambda_powertools.utilities import parameters
from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
    RedisCachePersistenceLayer,
)

redis_values: Any = parameters.get_secret("redis_info", transform="json")  # (1)!

redis_client = Redis(
    host=redis_values.get("REDIS_HOST"),
    port=redis_values.get("REDIS_PORT"),
    password=redis_values.get("REDIS_PASSWORD"),
    decode_responses=True,
    socket_timeout=10.0,
    ssl=True,
    retry_on_timeout=True,
)

persistence_layer = RedisCachePersistenceLayer(client=redis_client)
config = IdempotencyConfig(
    expires_after_seconds=2 * 60,  # 2 minutes
)


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event, context):
    return {"message": "Hello"}
  1. JSON stored: { "REDIS_ENDPOINT": "127.0.0.1", "REDIS_PORT": "6379", "REDIS_PASSWORD": "redis-secret" }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from typing import Any

from redis import Redis

from aws_lambda_powertools.shared.functions import abs_lambda_path
from aws_lambda_powertools.utilities import parameters
from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
    RedisCachePersistenceLayer,
)

redis_values: Any = parameters.get_secret("redis_info", transform="json")  # (1)!


redis_client = Redis(
    host=redis_values.get("REDIS_HOST"),
    port=redis_values.get("REDIS_PORT"),
    password=redis_values.get("REDIS_PASSWORD"),
    decode_responses=True,
    socket_timeout=10.0,
    ssl=True,
    retry_on_timeout=True,
    ssl_certfile=f"{abs_lambda_path()}/certs/redis_user.crt",  # (2)!
    ssl_keyfile=f"{abs_lambda_path()}/certs/redis_user_private.key",  # (3)!
    ssl_ca_certs=f"{abs_lambda_path()}/certs/redis_ca.pem",  # (4)!
)

persistence_layer = RedisCachePersistenceLayer(client=redis_client)
config = IdempotencyConfig(
    expires_after_seconds=2 * 60,  # 2 minutes
)


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event, context):
    return {"message": "Hello"}
  1. JSON stored: { "REDIS_ENDPOINT": "127.0.0.1", "REDIS_PORT": "6379", "REDIS_PASSWORD": "redis-secret" }
  2. redis_user.crt file stored in the "certs" directory of your Lambda function
  3. redis_user_private.key file stored in the "certs" directory of your Lambda function
  4. redis_ca.pem file stored in the "certs" directory of your Lambda function

Advanced

Customizing the default behavior

Idempotent decorator can be further configured with IdempotencyConfig as seen in the previous example. These are the available options for further configuration

Parameter Default Description
event_key_jmespath "" JMESPath expression to extract the idempotency key from the event record using built-in functions
payload_validation_jmespath "" JMESPath expression to validate whether certain parameters have changed in the event while the event payload
raise_on_no_idempotency_key False Raise exception if no idempotency key was found in the request
expires_after_seconds 3600 The number of seconds to wait before a record is expired
use_local_cache False Whether to locally cache idempotency results
local_cache_max_items 256 Max number of items to store in local cache
hash_function md5 Function to use for calculating hashes, as provided by hashlib in the standard library.
response_hook None Function to use for processing the stored Idempotent response. This function hook is called when an existing idempotent response is found. See Manipulating The Idempotent Response

Handling concurrent executions with the same payload

This utility will raise an IdempotencyAlreadyInProgressError exception if you receive multiple invocations with the same payload while the first invocation hasn't completed yet.

Info

If you receive IdempotencyAlreadyInProgressError, you can safely retry the operation.

This is a locking mechanism for correctness. Since we don't know the result from the first invocation yet, we can't safely allow another concurrent execution.

Using in-memory cache

By default, in-memory local caching is disabled, since we don't know how much memory you consume per invocation compared to the maximum configured in your Lambda function.

Note: This in-memory cache is local to each Lambda execution environment

This means it will be effective in cases where your function's concurrency is low in comparison to the number of "retry" invocations with the same payload, because cache might be empty.

You can enable in-memory caching with the use_local_cache parameter:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(
    event_key_jmespath="body",
    use_local_cache=True,
)


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event, context: LambdaContext):
    return event
1
2
3
{
  "body": "{\"user_id\":\"xyz\",\"product_id\":\"123456789\"}"
}

When enabled, the default is to cache a maximum of 256 records in each Lambda execution environment - You can change it with the local_cache_max_items parameter.

Expiring idempotency records

By default, we expire idempotency records after an hour (3600 seconds).

In most cases, it is not desirable to store the idempotency records forever. Rather, you want to guarantee that the same payload won't be executed within a period of time.

You can change this window with the expires_after_seconds parameter:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(
    event_key_jmespath="body",
    expires_after_seconds=5 * 60,  # 5 minutes
)


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event, context: LambdaContext):
    return event
1
2
3
{
  "body": "{\"user_id\":\"xyz\",\"product_id\":\"123456789\"}"
}

This will mark any records older than 5 minutes as expired, and your function will be executed as normal if it is invoked with a matching payload.

Idempotency record expiration vs DynamoDB time-to-live (TTL)

DynamoDB TTL is a feature to remove items after a certain period of time, it may occur within 48 hours of expiration.

We don't rely on DynamoDB or any persistence storage layer to determine whether a record is expired to avoid eventual inconsistency states.

Instead, Idempotency records saved in the storage layer contain timestamps that can be verified upon retrieval and double checked within Idempotency feature.

Why?

A record might still be valid (COMPLETE) when we retrieved, but in some rare cases it might expire a second later. A record could also be cached in memory. You might also want to have idempotent transactions that should expire in seconds.

Payload validation

Question: What if your function is invoked with the same payload except some outer parameters have changed?

Example: A payment transaction for a given productID was requested twice for the same customer, however the amount to be paid has changed in the second transaction.

By default, we will return the same result as it returned before, however in this instance it may be misleading; we provide a fail fast payload validation to address this edge case.

With payload_validation_jmespath, you can provide an additional JMESPath expression to specify which part of the event body should be validated against previous idempotent invocations

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from dataclasses import dataclass, field
from uuid import uuid4

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(event_key_jmespath='["user_id", "product_id"]', payload_validation_jmespath="amount")


@dataclass
class Payment:
    user_id: str
    product_id: str
    charge_type: str
    amount: int
    payment_id: str = field(default_factory=lambda: f"{uuid4()}")


class PaymentError(Exception):
    ...


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext):
    try:
        payment: Payment = create_subscription_payment(event)
        return {
            "payment_id": payment.payment_id,
            "message": "success",
            "statusCode": 200,
        }
    except Exception as exc:
        raise PaymentError(f"Error creating payment {str(exc)}")


def create_subscription_payment(event: dict) -> Payment:
    return Payment(**event)
1
2
3
4
5
6
{
  "user_id": 1,
  "product_id": 1500,
  "charge_type": "subscription",
  "amount": 500
}
1
2
3
4
5
6
{
  "user_id": 1,
  "product_id": 1500,
  "charge_type": "subscription",
  "amount": 10
}

In this example, the user_id and product_id keys are used as the payload to generate the idempotency key, as per event_key_jmespath parameter.

Note

If we try to send the same request but with a different amount, we will raise IdempotencyValidationError.

Without payload validation, we would have returned the same result as we did for the initial request. Since we're also returning an amount in the response, this could be quite confusing for the client.

By using payload_validation_jmespath="amount", we prevent this potentially confusing behavior and instead raise an Exception.

Making idempotency key required

If you want to enforce that an idempotency key is required, you can set raise_on_no_idempotency_key to True.

This means that we will raise IdempotencyKeyError if the evaluation of event_key_jmespath is None.

Warning

To prevent errors, transactions will not be treated as idempotent if raise_on_no_idempotency_key is set to False and the evaluation of event_key_jmespath is None. Therefore, no data will be fetched, stored, or deleted in the idempotency storage layer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(
    event_key_jmespath='["user.uid", "order_id"]',
    raise_on_no_idempotency_key=True,
)


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return event
1
2
3
4
5
6
7
{
  "user": {
    "uid": "BB0D045C-8878-40C8-889E-38B3CB0A61B1",
    "name": "Foo"
  },
  "order_id": 10000
}
1
2
3
4
5
6
7
{
  "user": {
    "uid": "BB0D045C-8878-40C8-889E-38B3CB0A61B1",
    "name": "Foo",
    "order_id": 10000
  }
}

Customizing boto configuration

The boto_config and boto3_session parameters enable you to pass in a custom botocore config object or a custom boto3 session when constructing the persistence store.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import boto3

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

# See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#module-boto3.session
boto3_session = boto3.session.Session()

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable", boto3_session=boto3_session)

config = IdempotencyConfig(event_key_jmespath="body")


@idempotent(persistence_store=persistence_layer, config=config)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return event
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from botocore.config import Config

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

# See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html#botocore-config
boto_config = Config()

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable", boto_config=boto_config)

config = IdempotencyConfig(event_key_jmespath="body")


@idempotent(persistence_store=persistence_layer, config=config)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return event
1
2
3
{
  "body": "{\"user_id\":\"xyz\",\"product_id\":\"123456789\"}"
}

Using a DynamoDB table with a composite primary key

When using a composite primary key table (hash+range key), use sort_key_attr parameter when initializing your persistence layer.

With this setting, we will save the idempotency key in the sort key instead of the primary key. By default, the primary key will now be set to idempotency#{LAMBDA_FUNCTION_NAME}.

You can optionally set a static value for the partition key using the static_pk_value parameter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable", sort_key_attr="sort_key")


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    user_id: str = event.get("body", "")["user_id"]
    return {"message": "success", "user_id": user_id}
1
2
3
{
  "body": "{\"user_id\":\"xyz\",\"product_id\":\"123456789\"}"
}

The example function above would cause data to be stored in DynamoDB like this:

id sort_key expiration status data
idempotency#MyLambdaFunction 1e956ef7da78d0cb890be999aecc0c9e 1636549553 COMPLETED {"user_id": 12391, "message": "success"}
idempotency#MyLambdaFunction 2b2cdb5f86361e97b4383087c1ffdf27 1636549571 COMPLETED {"user_id": 527212, "message": "success"}
idempotency#MyLambdaFunction f091d2527ad1c78f05d54cc3f363be80 1636549585 IN_PROGRESS

Bring your own persistent store

This utility provides an abstract base class (ABC), so that you can implement your choice of persistent storage layer.

You can create your own persistent store from scratch by inheriting the BasePersistenceLayer class, and implementing _get_record(), _put_record(), _update_record() and _delete_record().

  • _get_record() – Retrieves an item from the persistence store using an idempotency key and returns it as a DataRecord instance.
  • _put_record() – Adds a DataRecord to the persistence store if it doesn't already exist with that key. Raises an ItemAlreadyExists exception if a non-expired entry already exists.
  • _update_record() – Updates an item in the persistence store.
  • _delete_record() – Removes an item from the persistence store.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import datetime
import logging
from typing import Any, Dict, Optional

import boto3
from botocore.config import Config

from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer
from aws_lambda_powertools.utilities.idempotency.exceptions import (
    IdempotencyItemAlreadyExistsError,
    IdempotencyItemNotFoundError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord

logger = logging.getLogger(__name__)


class MyOwnPersistenceLayer(BasePersistenceLayer):
    def __init__(
        self,
        table_name: str,
        key_attr: str = "id",
        expiry_attr: str = "expiration",
        status_attr: str = "status",
        data_attr: str = "data",
        validation_key_attr: str = "validation",
        boto_config: Optional[Config] = None,
        boto3_session: Optional[boto3.session.Session] = None,
    ):
        boto_config = boto_config or Config()
        session = boto3_session or boto3.session.Session()
        self._ddb_resource = session.resource("dynamodb", config=boto_config)
        self.table_name = table_name
        self.table = self._ddb_resource.Table(self.table_name)
        self.key_attr = key_attr
        self.expiry_attr = expiry_attr
        self.status_attr = status_attr
        self.data_attr = data_attr
        self.validation_key_attr = validation_key_attr
        super(MyOwnPersistenceLayer, self).__init__()

    def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
        """
        Translate raw item records from DynamoDB to DataRecord

        Parameters
        ----------
        item: Dict[str, Union[str, int]]
                Item format from dynamodb response

        Returns
        -------
        DataRecord
                representation of item

        """
        return DataRecord(
            idempotency_key=item[self.key_attr],
            status=item[self.status_attr],
            expiry_timestamp=item[self.expiry_attr],
            response_data=item.get(self.data_attr, ""),
            payload_hash=item.get(self.validation_key_attr, ""),
        )

    def _get_record(self, idempotency_key) -> DataRecord:
        response = self.table.get_item(Key={self.key_attr: idempotency_key}, ConsistentRead=True)

        try:
            item = response["Item"]
        except KeyError:
            raise IdempotencyItemNotFoundError
        return self._item_to_data_record(item)

    def _put_record(self, data_record: DataRecord) -> None:
        item = {
            self.key_attr: data_record.idempotency_key,
            self.expiry_attr: data_record.expiry_timestamp,
            self.status_attr: data_record.status,
        }

        if self.payload_validation_enabled:
            item[self.validation_key_attr] = data_record.payload_hash

        now = datetime.datetime.now()
        try:
            logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")
            self.table.put_item(
                Item=item,
                ConditionExpression=f"attribute_not_exists({self.key_attr}) OR {self.expiry_attr} < :now",
                ExpressionAttributeValues={":now": int(now.timestamp())},
            )
        except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException:
            logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
            raise IdempotencyItemAlreadyExistsError

    def _update_record(self, data_record: DataRecord):
        logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
        update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
        expression_attr_values = {
            ":expiry": data_record.expiry_timestamp,
            ":response_data": data_record.response_data,
            ":status": data_record.status,
        }
        expression_attr_names = {
            "#response_data": self.data_attr,
            "#expiry": self.expiry_attr,
            "#status": self.status_attr,
        }

        if self.payload_validation_enabled:
            update_expression += ", #validation_key = :validation_key"
            expression_attr_values[":validation_key"] = data_record.payload_hash
            expression_attr_names["#validation_key"] = self.validation_key_attr

        kwargs = {
            "Key": {self.key_attr: data_record.idempotency_key},
            "UpdateExpression": update_expression,
            "ExpressionAttributeValues": expression_attr_values,
            "ExpressionAttributeNames": expression_attr_names,
        }

        self.table.update_item(**kwargs)

    def _delete_record(self, data_record: DataRecord) -> None:
        logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}")
        self.table.delete_item(
            Key={self.key_attr: data_record.idempotency_key},
        )
Danger

Pay attention to the documentation for each - you may need to perform additional checks inside these methods to ensure the idempotency guarantees remain intact.

For example, the _put_record method needs to raise an exception if a non-expired record already exists in the data store with a matching key.

Manipulating the Idempotent Response

You can set up a response_hook in the IdempotentConfig class to manipulate the returned data when an operation is idempotent. The hook function will be called with the current deserialized response object and the Idempotency record.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import datetime
import uuid
from typing import Dict

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
    DataRecord,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


def my_response_hook(response: Dict, idempotent_data: DataRecord) -> Dict:
    # Return inserted Header data into the Idempotent Response
    response["x-idempotent-key"] = idempotent_data.idempotency_key

    # expiry_timestamp could be None so include if set
    expiry_timestamp = idempotent_data.expiry_timestamp
    if expiry_timestamp:
        expiry_time = datetime.datetime.fromtimestamp(int(expiry_timestamp))
        response["x-idempotent-expiration"] = expiry_time.isoformat()

    # Must return the response here
    return response


dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(response_hook=my_response_hook)


@idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb)
def process_order(order: dict) -> dict:
    # create the order_id
    order_id = str(uuid.uuid4())

    # create your logic to save the order
    # append the order_id created
    order["order_id"] = order_id

    # return the order
    return {"order": order}


def lambda_handler(event: dict, context: LambdaContext):
    config.register_lambda_context(context)  # see Lambda timeouts section
    try:
        logger.info(f"Processing order id {event.get('order_id')}")
        return process_order(order=event.get("order"))
    except Exception as err:
        return {"status_code": 400, "error": f"Error processing {str(err)}"}
1
2
3
4
5
6
7
8
{
  "order" : {
    "user_id": "xyz",
    "product_id": "123456789",
    "quantity": 2,
    "value": 30
  }
}
Info: Using custom de-serialization?

The response_hook is called after the custom de-serialization so the payload you process will be the de-serialized version.

Being a good citizen

When using response hooks to manipulate returned data from idempotent operations, it's important to follow best practices to avoid introducing complexity or issues. Keep these guidelines in mind:

  1. Response hook works exclusively when operations are idempotent. The hook will not be called when an operation is not idempotent, or when the idempotent logic fails.

  2. Catch and Handle Exceptions. Your response hook code should catch and handle any exceptions that may arise from your logic. Unhandled exceptions will cause the Lambda function to fail unexpectedly.

  3. Keep Hook Logic Simple Response hooks should consist of minimal and straightforward logic for manipulating response data. Avoid complex conditional branching and aim for hooks that are easy to reason about.

Compatibility with other utilities

Batch

See Batch integration above.

Validation utility

The idempotency utility can be used with the validator decorator. Ensure that idempotency is the innermost decorator.

Warning

If you use an envelope with the validator, the event received by the idempotency utility will be the unwrapped event - not the "raw" event Lambda was invoked with.

Make sure to account for this behavior, if you set the event_key_jmespath.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.validation import envelopes, validator

config = IdempotencyConfig(event_key_jmespath='["message", "username"]')
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")


@validator(envelope=envelopes.API_GATEWAY_HTTP)
@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event, context: LambdaContext):
    return {"message": event["message"], "statusCode": 200}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
{
  "version": "2.0",
  "routeKey": "$default",
  "rawPath": "/my/path",
  "rawQueryString": "parameter1=value1&parameter1=value2&parameter2=value",
  "cookies": [
    "cookie1",
    "cookie2"
  ],
  "headers": {
    "Header1": "value1",
    "Header2": "value1,value2"
  },
  "queryStringParameters": {
    "parameter1": "value1,value2",
    "parameter2": "value"
  },
  "requestContext": {
    "accountId": "123456789012",
    "apiId": "api-id",
    "authentication": {
      "clientCert": {
        "clientCertPem": "CERT_CONTENT",
        "subjectDN": "www.example.com",
        "issuerDN": "Example issuer",
        "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1",
        "validity": {
          "notBefore": "May 28 12:30:02 2019 GMT",
          "notAfter": "Aug  5 09:36:04 2021 GMT"
        }
      }
    },
    "authorizer": {
      "jwt": {
        "claims": {
          "claim1": "value1",
          "claim2": "value2"
        },
        "scopes": [
          "scope1",
          "scope2"
        ]
      }
    },
    "domainName": "id.execute-api.us-east-1.amazonaws.com",
    "domainPrefix": "id",
    "http": {
      "method": "POST",
      "path": "/my/path",
      "protocol": "HTTP/1.1",
      "sourceIp": "192.168.0.1/32",
      "userAgent": "agent"
    },
    "requestId": "id",
    "routeKey": "$default",
    "stage": "$default",
    "time": "12/Mar/2020:19:03:58 +0000",
    "timeEpoch": 1583348638390
  },
  "body": "{\"message\": \"hello world\", \"username\": \"tom\"}",
  "pathParameters": {
    "parameter1": "value1"
  },
  "isBase64Encoded": false,
  "stageVariables": {
    "stageVariable1": "value1",
    "stageVariable2": "value2"
  }
}
Tip: JMESPath Powertools for AWS Lambda (Python) functions are also available

Built-in functions known in the validation utility like powertools_json, powertools_base64, powertools_base64_gzip are also available to use in this utility.

Tracer

The idempotency utility can be used with the tracer decorator. Ensure that idempotency is the innermost decorator.

First execution

During the first execution with a payload, Lambda performs a PutItem followed by an UpdateItem operation to persist the record in DynamoDB.

Tracer showcase

Subsequent executions

On subsequent executions with the same payload, Lambda optimistically tries to save the record in DynamoDB. If the record already exists, DynamoDB returns the item.

Explore how to handle conditional write errors in high-concurrency scenarios with DynamoDB in this blog post.

Tracer showcase

Testing your code

The idempotency utility provides several routes to test your code.

Disabling the idempotency utility

When testing your code, you may wish to disable the idempotency logic altogether and focus on testing your business logic. To do this, you can set the environment variable POWERTOOLS_IDEMPOTENCY_DISABLED with a truthy value. If you prefer setting this for specific tests, and are using Pytest, you can use monkeypatch fixture:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from dataclasses import dataclass

import app_test_disabling_idempotency_utility
import pytest


@pytest.fixture
def lambda_context():
    @dataclass
    class LambdaContext:
        function_name: str = "test"
        memory_limit_in_mb: int = 128
        invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test"
        aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72"

        def get_remaining_time_in_millis(self) -> int:
            return 5

    return LambdaContext()


def test_idempotent_lambda_handler(monkeypatch, lambda_context):
    # Set POWERTOOLS_IDEMPOTENCY_DISABLED before calling decorated functions
    monkeypatch.setenv("POWERTOOLS_IDEMPOTENCY_DISABLED", 1)

    result = app_test_disabling_idempotency_utility.lambda_handler({}, lambda_context)

    assert result
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext):
    print("expensive operation")
    return {
        "payment_id": 12345,
        "message": "success",
        "statusCode": 200,
    }

Testing with DynamoDB Local

To test with DynamoDB Local, you can replace the DynamoDB client used by the persistence layer with one you create inside your tests. This allows you to set the endpoint_url.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from dataclasses import dataclass

import app_test_dynamodb_local
import boto3
import pytest


@pytest.fixture
def lambda_context():
    @dataclass
    class LambdaContext:
        function_name: str = "test"
        memory_limit_in_mb: int = 128
        invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test"
        aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72"

        def get_remaining_time_in_millis(self) -> int:
            return 5

    return LambdaContext()


def test_idempotent_lambda(lambda_context):
    # Configure the boto3 to use the endpoint for the DynamoDB Local instance
    dynamodb_local_client = boto3.client("dynamodb", endpoint_url="http://localhost:8000")
    app_test_dynamodb_local.persistence_layer.client = dynamodb_local_client

    # If desired, you can use a different DynamoDB Local table name than what your code already uses
    # app.persistence_layer.table_name = "another table name" # noqa: ERA001

    result = app_test_dynamodb_local.handler({"testkey": "testvalue"}, lambda_context)
    assert result["payment_id"] == 12345
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext):
    print("expensive operation")
    return {
        "payment_id": 12345,
        "message": "success",
        "statusCode": 200,
    }

How do I mock all DynamoDB I/O operations

The idempotency utility lazily creates the dynamodb Table which it uses to access DynamoDB. This means it is possible to pass a mocked Table resource, or stub various methods.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from dataclasses import dataclass
from unittest.mock import MagicMock

import app_test_io_operations
import pytest


@pytest.fixture
def lambda_context():
    @dataclass
    class LambdaContext:
        function_name: str = "test"
        memory_limit_in_mb: int = 128
        invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test"
        aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72"

        def get_remaining_time_in_millis(self) -> int:
            return 5

    return LambdaContext()


def test_idempotent_lambda(lambda_context):
    mock_client = MagicMock()
    app_test_io_operations.persistence_layer.client = mock_client
    result = app_test_io_operations.handler({"testkey": "testvalue"}, lambda_context)
    mock_client.put_item.assert_called()
    assert result
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    idempotent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event: dict, context: LambdaContext):
    print("expensive operation")
    return {
        "payment_id": 12345,
        "message": "success",
        "statusCode": 200,
    }

Testing with Redis

To test locally, you can either utilize fakeredis-py for a simulated Redis environment or refer to the MockRedis class used in our tests to mock Redis operations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from dataclasses import dataclass

import pytest
from mock_redis import MockRedis

from aws_lambda_powertools.utilities.idempotency import (
    idempotent,
)
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
    RedisCachePersistenceLayer,
)
from aws_lambda_powertools.utilities.typing import LambdaContext


@pytest.fixture
def lambda_context():
    @dataclass
    class LambdaContext:
        function_name: str = "test"
        memory_limit_in_mb: int = 128
        invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test"
        aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72"

        def get_remaining_time_in_millis(self) -> int:
            return 1000

    return LambdaContext()


def test_idempotent_lambda(lambda_context):
    # Init the Mock redis client
    redis_client = MockRedis(decode_responses=True)
    # Establish persistence layer using the mock redis client
    persistence_layer = RedisCachePersistenceLayer(client=redis_client)

    # setup idempotent with redis persistence layer
    @idempotent(persistence_store=persistence_layer)
    def lambda_handler(event: dict, context: LambdaContext):
        print("expensive operation")
        return {
            "payment_id": 12345,
            "message": "success",
            "statusCode": 200,
        }

    # Inovke the sim lambda handler
    result = lambda_handler({"testkey": "testvalue"}, lambda_context)
    assert result["payment_id"] == 12345
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import time as t
from typing import Dict


# Mock redis class that includes all operations we used in Idempotency
class MockRedis:
    def __init__(self, decode_responses, cache: Dict, **kwargs):
        self.cache = cache or {}
        self.expire_dict: Dict = {}
        self.decode_responses = decode_responses
        self.acl: Dict = {}
        self.username = ""

    def hset(self, name, mapping):
        self.expire_dict.pop(name, {})
        self.cache[name] = mapping

    def from_url(self, url: str):
        pass

    def expire(self, name, time):
        self.expire_dict[name] = t.time() + time

    # return {} if no match
    def hgetall(self, name):
        if self.expire_dict.get(name, t.time() + 1) < t.time():
            self.cache.pop(name, {})
        return self.cache.get(name, {})

    def get_connection_kwargs(self):
        return {"decode_responses": self.decode_responses}

    def auth(self, username, **kwargs):
        self.username = username

    def delete(self, name):
        self.cache.pop(name, {})

If you want to set up a real Redis client for integration testing, you can reference the code provided below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from dataclasses import dataclass

import pytest
import redis

from aws_lambda_powertools.utilities.idempotency import (
    idempotent,
)
from aws_lambda_powertools.utilities.idempotency.persistence.redis import (
    RedisCachePersistenceLayer,
)
from aws_lambda_powertools.utilities.typing import LambdaContext


@pytest.fixture
def lambda_context():
    @dataclass
    class LambdaContext:
        function_name: str = "test"
        memory_limit_in_mb: int = 128
        invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test"
        aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72"

        def get_remaining_time_in_millis(self) -> int:
            return 1000

    return LambdaContext()


@pytest.fixture
def persistence_store_standalone_redis():
    # init a Real Redis client and connect to the Port set in the Makefile
    redis_client = redis.Redis(
        host="localhost",
        port="63005",
        decode_responses=True,
    )

    # return a persistence layer with real Redis
    return RedisCachePersistenceLayer(client=redis_client)


def test_idempotent_lambda(lambda_context, persistence_store_standalone_redis):
    # Establish persistence layer using the real redis client
    persistence_layer = persistence_store_standalone_redis

    # setup idempotent with redis persistence layer
    @idempotent(persistence_store=persistence_layer)
    def lambda_handler(event: dict, context: LambdaContext):
        print("expensive operation")
        return {
            "payment_id": 12345,
            "message": "success",
            "statusCode": 200,
        }

    # Inovke the sim lambda handler
    result = lambda_handler({"testkey": "testvalue"}, lambda_context)
    assert result["payment_id"] == 12345
1
2
3
test-idempotency-redis: # (1)!
    docker run --name test-idempotency-redis -d -p 63005:6379 redis
    pytest test_with_real_redis.py;docker stop test-idempotency-redis;docker rm test-idempotency-redis
  1. Use this script to setup a temp Redis docker and auto remove it upon completion

Extra resources

If you're interested in a deep dive on how Amazon uses idempotency when building our APIs, check out this article.