Skip to content

Event Source Data Classes

Event Source Data Classes provides self-describing and strongly-typed classes for various AWS Lambda event sources.

Key features

  • Type hinting and code completion for common event types
  • Helper functions for decoding/deserializing nested fields
  • Docstrings for fields contained in event schemas
  • Standardized attribute-based access to event properties

Getting started

Tip

All examples shared in this documentation are available within the project repository.

There are two ways to use Event Source Data Classes in your Lambda functions.

Method 1: Direct Initialization

You can initialize the appropriate data class by passing the Lambda event object to its constructor.

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent


def lambda_handler(event: dict, context):
    api_event = APIGatewayProxyEvent(event)
    if "hello" in api_event.path and api_event.http_method == "GET":
        return {"statusCode": 200, "body": f"Hello from path: {api_event.path}"}
    else:
        return {"statusCode": 400, "body": "No Hello from path"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
{
    "resource": "/helloworld",
    "path": "/hello",
    "httpMethod": "GET",
    "headers": {
        "Accept": "*/*",
        "Host": "api.example.com"
    },
    "queryStringParameters": {
        "name": "John"
    },
    "pathParameters": null,
    "stageVariables": null,
    "requestContext": {
        "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
        "stage": "prod"
    },
    "body": null,
    "isBase64Encoded": false
}

Method 2: Using the event_source Decorator

Alternatively, you can use the event_source decorator to automatically parse the event.

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent, event_source


@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
    if "hello" in event.path and event.http_method == "GET":
        return {"statusCode": 200, "body": f"Hello from path: {event.path}"}
    else:
        return {"statusCode": 400, "body": "No Hello from path"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
{
    "resource": "/helloworld",
    "path": "/hello",
    "httpMethod": "GET",
    "headers": {
        "Accept": "*/*",
        "Host": "api.example.com"
    },
    "queryStringParameters": {
        "name": "John"
    },
    "pathParameters": null,
    "stageVariables": null,
    "requestContext": {
        "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
        "stage": "prod"
    },
    "body": null,
    "isBase64Encoded": false
}

Autocomplete with self-documented properties and methods

Event Source Data Classes has the ability to leverage IDE autocompletion and inline documentation. When using the APIGatewayProxyEvent class, for example, the IDE will offer autocomplete suggestions for various properties and methods.

Utilities Data Classes

Supported event sources

Each event source is linked to its corresponding GitHub file with the full set of properties, methods, and docstrings specific to each event type.

Event Source Data_class Properties
Active MQ ActiveMQEvent Github
API Gateway Authorizer APIGatewayAuthorizerRequestEvent Github
API Gateway Authorizer V2 APIGatewayAuthorizerEventV2 Github
API Gateway Proxy APIGatewayProxyEvent Github
API Gateway Proxy V2 APIGatewayProxyEventV2 Github
Application Load Balancer ALBEvent Github
AppSync Authorizer AppSyncAuthorizerEvent Github
AppSync Resolver AppSyncResolverEvent Github
AWS Config Rule AWSConfigRuleEvent Github
Bedrock Agent BedrockAgent Github
CloudFormation Custom Resource CloudFormationCustomResourceEvent Github
CloudWatch Alarm State Change Action CloudWatchAlarmEvent Github
CloudWatch Dashboard Custom Widget CloudWatchDashboardCustomWidgetEvent Github
CloudWatch Logs CloudWatchLogsEvent Github
CodeDeploy Lifecycle Hook CodeDeployLifecycleHookEvent Github
CodePipeline Job Event CodePipelineJobEvent Github
Cognito User Pool Multiple available under cognito_user_pool_event Github
Connect Contact Flow ConnectContactFlowEvent Github
DynamoDB streams DynamoDBStreamEvent, DynamoDBRecordEventName Github
EventBridge EventBridgeEvent Github
Kafka KafkaEvent Github
Kinesis Data Stream KinesisStreamEvent Github
Kinesis Firehose Delivery Stream KinesisFirehoseEvent Github
Lambda Function URL LambdaFunctionUrlEvent Github
Rabbit MQ RabbitMQEvent Github
S3 S3Event Github
S3 Batch Operations S3BatchOperationEvent Github
S3 Object Lambda S3ObjectLambdaEvent Github
S3 EventBridge Notification S3EventBridgeNotificationEvent Github
SES SESEvent Github
SNS SNSEvent Github
SQS SQSEvent Github
[TransferFamilyAuthorizer] TransferFamilyAuthorizer Github
[TransferFamilyAuthorizerResponse] TransferFamilyAuthorizerResponse Github
VPC Lattice V2 VPCLatticeV2Event Github
VPC Lattice V1 VPCLatticeEvent Github
Info

The examples showcase a subset of Event Source Data Classes capabilities - for comprehensive details, leverage your IDE's autocompletion, refer to type hints and docstrings, and explore the full API reference for complete property listings of each event source.

Active MQ

It is used for Active MQ payloads, also see the AWS blog post for more details.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import json

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent

logger = Logger()


@event_source(data_class=ActiveMQEvent)
def lambda_handler(event: ActiveMQEvent, context):
    for message in event.messages:
        msg = message.message_id
        msg_pn = message.destination_physicalname

        logger.info(f"Message ID: {msg} and physical name: {msg_pn}")

    return {"statusCode": 200, "body": json.dumps("Processing complete")}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
{
  "eventSource": "aws:amq",
  "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",
  "messages": [
    {
      "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
      "messageType": "jms/text-message",
      "data": "QUJDOkFBQUE=",
      "connectionId": "myJMSCoID",
      "redelivered": false,
      "destination": {
        "physicalName": "testQueue"
      },
      "timestamp": 1598827811958,
      "brokerInTime": 1598827811958,
      "brokerOutTime": 1598827811959,
      "properties": {
        "testKey": "testValue"
      }
    },
    {
      "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
      "messageType": "jms/text-message",
      "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==",
      "connectionId": "myJMSCoID2",
      "redelivered": false,
      "destination": {
        "physicalName": "testQueue"
      },
      "timestamp": 1598827811958,
      "brokerInTime": 1598827811958,
      "brokerOutTime": 1598827811959,
      "properties": {
        "testKey": "testValue"
      }

    },
    {
      "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
      "messageType": "jms/bytes-message",
      "data": "3DTOOW7crj51prgVLQaGQ82S48k=",
      "connectionId": "myJMSCoID1",
      "persistent": false,
      "destination": {
        "physicalName": "testQueue"
      },
      "timestamp": 1598827811958,
      "brokerInTime": 1598827811958,
      "brokerOutTime": 1598827811959,
      "properties": {
        "testKey": "testValue"
      }
    }
  ]
}

API Gateway Authorizer

It is used for API Gateway Rest API Lambda Authorizer payload.

Use APIGatewayAuthorizerRequestEvent for type REQUEST and APIGatewayAuthorizerTokenEvent for type TOKEN.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
    APIGatewayAuthorizerRequestEvent,
    APIGatewayAuthorizerResponse,
)


@event_source(data_class=APIGatewayAuthorizerRequestEvent)
def lambda_handler(event: APIGatewayAuthorizerRequestEvent, context):
    # Simple auth check (replace with your actual auth logic)
    is_authorized = event.headers.get("HeaderAuth1") == "headerValue1"

    if not is_authorized:
        return {"principalId": "", "policyDocument": {"Version": "2012-10-17", "Statement": []}}

    arn = event.parsed_arn

    policy = APIGatewayAuthorizerResponse(
        principal_id="user",
        context={"user": "example"},
        region=arn.region,
        aws_account_id=arn.aws_account_id,
        api_id=arn.api_id,
        stage=arn.stage,
    )

    policy.allow_all_routes()

    return policy.asdict()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
    APIGatewayAuthorizerRequestEvent,
    APIGatewayAuthorizerResponseWebSocket,
)


@event_source(data_class=APIGatewayAuthorizerRequestEvent)
def lambda_handler(event: APIGatewayAuthorizerRequestEvent, context):
    # Simple auth check (replace with your actual auth logic)
    is_authorized = event.headers.get("HeaderAuth1") == "headerValue1"

    if not is_authorized:
        return {"principalId": "", "policyDocument": {"Version": "2012-10-17", "Statement": []}}

    arn = event.parsed_arn

    policy = APIGatewayAuthorizerResponseWebSocket(
        principal_id="user",
        context={"user": "example"},
        region=arn.region,
        aws_account_id=arn.aws_account_id,
        api_id=arn.api_id,
        stage=arn.stage,
    )

    policy.allow_all_routes()

    return policy.asdict()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
{
  "version": "1.0",
  "type": "REQUEST",
  "methodArn": "arn:aws:execute-api:us-east-1:123456789012:abcdef123/test/GET/request",
  "identitySource": "user1,123",
  "authorizationToken": "user1,123",
  "resource": "/request",
  "path": "/request",
  "httpMethod": "GET",
  "headers": {
    "X-AMZ-Date": "20170718T062915Z",
    "Accept": "*/*",
    "HeaderAuth1": "headerValue1",
    "CloudFront-Viewer-Country": "US",
    "CloudFront-Forwarded-Proto": "https",
    "CloudFront-Is-Tablet-Viewer": "false",
    "CloudFront-Is-Mobile-Viewer": "false",
    "User-Agent": "..."
  },
  "multiValueHeaders": {
    "Header1": [
      "value1"
    ],
    "Origin": [
      "https://aws.amazon.com"
    ],
    "Header2": [
      "value1",
      "value2"
    ]
  },
  "queryStringParameters": {
    "QueryString1": "queryValue1"
  },
  "pathParameters": {},
  "stageVariables": {
    "StageVar1": "stageValue1"
  },
  "requestContext": {
    "accountId": "123456789012",
    "apiId": "abcdef123",
    "domainName": "3npb9j1tlk.execute-api.us-west-1.amazonaws.com",
    "domainPrefix": "3npb9j1tlk",
    "extendedRequestId": "EXqgWgXxSK4EJug=",
    "httpMethod": "GET",
    "identity": {
      "accessKey": null,
      "accountId": null,
      "caller": null,
      "cognitoAmr": null,
      "cognitoAuthenticationProvider": null,
      "cognitoAuthenticationType": null,
      "cognitoIdentityId": null,
      "cognitoIdentityPoolId": null,
      "principalOrgId": null,
      "apiKey": "...",
      "sourceIp": "test-invoke-source-ip",
      "user": null,
      "userAgent": "PostmanRuntime/7.28.3",
      "userArn": null,
      "clientCert": {
        "clientCertPem": "CERT_CONTENT",
        "subjectDN": "www.example.com",
        "issuerDN": "Example issuer",
        "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1",
        "validity": {
          "notBefore": "May 28 12:30:02 2019 GMT",
          "notAfter": "Aug  5 09:36:04 2021 GMT"
        }
      }
    },
    "path": "/request",
    "protocol": "HTTP/1.1",
    "requestId": "EXqgWgXxSK4EJug=",
    "requestTime": "20/Aug/2021:14:36:50 +0000",
    "requestTimeEpoch": 1629470210043,
    "resourceId": "ANY /request",
    "resourcePath": "/request",
    "stage": "test"
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
    APIGatewayAuthorizerResponse,
    APIGatewayAuthorizerTokenEvent,
)


@event_source(data_class=APIGatewayAuthorizerTokenEvent)
def lambda_handler(event: APIGatewayAuthorizerTokenEvent, context):
    # Simple token check (replace with your actual token validation logic)
    is_valid_token = event.authorization_token == "allow"

    if not is_valid_token:
        return {"principalId": "", "policyDocument": {"Version": "2012-10-17", "Statement": []}}

    arn = event.parsed_arn

    policy = APIGatewayAuthorizerResponse(
        principal_id="user",
        context={"user": "example"},
        region=arn.region,
        aws_account_id=arn.aws_account_id,
        api_id=arn.api_id,
        stage=arn.stage,
    )

    policy.allow_all_routes()

    return policy.asdict()
1
2
3
4
5
{
  "type": "TOKEN",
  "authorizationToken": "allow",
  "methodArn": "arn:aws:execute-api:us-west-2:123456789012:ymy8tbxw7b/*/GET/"
}

API Gateway Authorizer V2

It is used for API Gateway HTTP API Lambda Authorizer payload version 2. See also this blog post for more details.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from secrets import compare_digest

from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
    APIGatewayAuthorizerEventV2,
    APIGatewayAuthorizerResponseV2,
)


def get_user_by_token(token):
    if compare_digest(token, "value"):
        return {"name": "Foo"}
    return None


@event_source(data_class=APIGatewayAuthorizerEventV2)
def lambda_handler(event: APIGatewayAuthorizerEventV2, context):
    user = get_user_by_token(event.headers.get("Authorization"))

    if user is None:
        # No user was found, so we return not authorized
        return APIGatewayAuthorizerResponseV2(authorize=False).asdict()

    # Found the user and setting the details in the context
    response = APIGatewayAuthorizerResponseV2(
        authorize=True,
        context=user,
    )

    return response.asdict()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
  "version": "2.0",
  "type": "REQUEST",
  "routeArn": "arn:aws:execute-api:us-east-1:123456789012:abcdef123/test/GET/request",
  "identitySource": ["user1", "123"],
  "routeKey": "GET /merchants",
  "rawPath": "/merchants",
  "rawQueryString": "parameter1=value1&parameter1=value2&parameter2=value",
  "cookies": ["cookie1", "cookie2"],
  "headers": {
    "x-amzn-trace-id": "Root=1-611cc4a7-0746ebee281cfd967db97b64",
    "Header1": "value1",
    "Header2": "value2",
    "Authorization": "value"
  },
  "queryStringParameters": {
    "parameter1": "value1,value2",
    "parameter2": "value"
  },
  "requestContext": {
    "accountId": "123456789012",
    "apiId": "api-id",
    "authentication": {
      "clientCert": {
        "clientCertPem": "CERT_CONTENT",
        "subjectDN": "www.example.com",
        "issuerDN": "Example issuer",
        "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1",
        "validity": {
          "notBefore": "May 28 12:30:02 2019 GMT",
          "notAfter": "Aug  5 09:36:04 2021 GMT"
        }
      }
    },
    "domainName": "id.execute-api.us-east-1.amazonaws.com",
    "domainPrefix": "id",
    "http": {
      "method": "POST",
      "path": "/merchants",
      "protocol": "HTTP/1.1",
      "sourceIp": "10.10.10.10",
      "userAgent": "agent"
    },
    "requestId": "id",
    "routeKey": "GET /merchants",
    "stage": "$default",
    "time": "12/Mar/2020:19:03:58 +0000",
    "timeEpoch": 1583348638390
  },
  "pathParameters": { "parameter1": "value1" },
  "stageVariables": { "stageVariable1": "value1", "stageVariable2": "value2" }
}

API Gateway Proxy

It is used for either API Gateway REST API or HTTP API using v1 proxy event.

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent, event_source


@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
    if "hello" in event.path and event.http_method == "GET":
        return {"statusCode": 200, "body": f"Hello from path: {event.path}"}
    else:
        return {"statusCode": 400, "body": "No Hello from path"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
{
    "resource": "/helloworld",
    "path": "/hello",
    "httpMethod": "GET",
    "headers": {
        "Accept": "*/*",
        "Host": "api.example.com"
    },
    "queryStringParameters": {
        "name": "John"
    },
    "pathParameters": null,
    "stageVariables": null,
    "requestContext": {
        "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
        "stage": "prod"
    },
    "body": null,
    "isBase64Encoded": false
}

API Gateway Proxy V2

It is used for HTTP API using v2 proxy event.

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEventV2, event_source


@event_source(data_class=APIGatewayProxyEventV2)
def lambda_handler(event: APIGatewayProxyEventV2, context):
    if "hello" in event.path and event.http_method == "POST":
        return {"statusCode": 200, "body": f"Hello from path: {event.path}"}
    else:
        return {"statusCode": 400, "body": "No Hello from path"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
{
  "version": "2.0",
  "routeKey": "$default",
  "rawPath": "/my/path",
  "rawQueryString": "parameter1=value1&parameter1=value2&parameter2=value",
  "cookies": [
    "cookie1",
    "cookie2"
  ],
  "headers": {
    "Header1": "value1",
    "Header2": "value1,value2"
  },
  "queryStringParameters": {
    "parameter1": "value1,value2",
    "parameter2": "value"
  },
  "requestContext": {
    "accountId": "123456789012",
    "apiId": "api-id",
    "authentication": {
      "clientCert": {
        "clientCertPem": "CERT_CONTENT",
        "subjectDN": "www.example.com",
        "issuerDN": "Example issuer",
        "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1",
        "validity": {
          "notBefore": "May 28 12:30:02 2019 GMT",
          "notAfter": "Aug  5 09:36:04 2021 GMT"
        }
      }
    },
    "authorizer": {
      "jwt": {
        "claims": {
          "claim1": "value1",
          "claim2": "value2"
        },
        "scopes": [
          "scope1",
          "scope2"
        ]
      }
    },
    "domainName": "id.execute-api.us-east-1.amazonaws.com",
    "domainPrefix": "id",
    "http": {
      "method": "POST",
      "path": "/my/path",
      "protocol": "HTTP/1.1",
      "sourceIp": "192.168.0.1/32",
      "userAgent": "agent"
    },
    "requestId": "id",
    "routeKey": "$default",
    "stage": "$default",
    "time": "12/Mar/2020:19:03:58 +0000",
    "timeEpoch": 1583348638390
  },
  "body": "{\"message\": \"hello world\", \"username\": \"tom\"}",
  "pathParameters": {
    "parameter1": "value1"
  },
  "isBase64Encoded": false,
  "stageVariables": {
    "stageVariable1": "value1",
    "stageVariable2": "value2"
  }
}

Application Load Balancer

Is it used for Application load balancer event.

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import ALBEvent, event_source


@event_source(data_class=ALBEvent)
def lambda_handler(event: ALBEvent, context):
    if "lambda" in event.path and event.http_method == "GET":
        return {"statusCode": 200, "body": f"Hello from path: {event.path}"}
    else:
        return {"statusCode": 400, "body": "No Hello from path"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
  "requestContext": {
    "elb": {
      "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/lambda-279XGJDqGZ5rsrHC2Fjr/49e9d65c45c6791a"
    }
  },
  "httpMethod": "GET",
  "path": "/lambda",
  "queryStringParameters": {
    "query": "1234ABCD"
  },
  "headers": {
    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
    "accept-encoding": "gzip",
    "accept-language": "en-US,en;q=0.9",
    "connection": "keep-alive",
    "host": "lambda-alb-123578498.us-east-2.elb.amazonaws.com",
    "upgrade-insecure-requests": "1",
    "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",
    "x-amzn-trace-id": "Root=1-5c536348-3d683b8b04734faae651f476",
    "x-forwarded-for": "72.12.164.125",
    "x-forwarded-port": "80",
    "x-forwarded-proto": "http",
    "x-imforwards": "20"
  },
  "body": "Test",
  "isBase64Encoded": false
}

AppSync Authorizer

Used when building an AWS_LAMBDA Authorization with AppSync. See blog post Introducing Lambda authorization for AWS AppSync GraphQL APIs or read the Amplify documentation on using AWS Lambda for authorization with AppSync.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from typing import Dict

from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.logging.logger import Logger
from aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event import (
    AppSyncAuthorizerEvent,
    AppSyncAuthorizerResponse,
)
from aws_lambda_powertools.utilities.data_classes.event_source import event_source

logger = Logger()


def get_user_by_token(token: str):
    """Look a user by token"""
    ...


@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER)
@event_source(data_class=AppSyncAuthorizerEvent)
def lambda_handler(event: AppSyncAuthorizerEvent, context) -> Dict:
    user = get_user_by_token(event.authorization_token)

    if not user:
        # No user found, return not authorized
        return AppSyncAuthorizerResponse().asdict()

    return AppSyncAuthorizerResponse(
        authorize=True,
        resolver_context={"id": user.id},
        # Only allow admins to delete events
        deny_fields=None if user.is_admin else ["Mutation.deleteEvent"],
    ).asdict()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
    "authorizationToken": "BE9DC5E3-D410-4733-AF76-70178092E681",
    "requestContext": {
        "apiId": "giy7kumfmvcqvbedntjwjvagii",
        "accountId": "254688921111",
        "requestId": "b80ed838-14c6-4500-b4c3-b694c7bef086",
        "queryString": "mutation MyNewTask($desc: String!) {\n  createTask(description: $desc, owner: \"ccc\", taskStatus: \"cc\", title: \"ccc\") {\n    id\n  }\n}\n",
        "operationName": "MyNewTask",
        "variables": {
            "desc": "Foo"
        }
    }
}

AppSync Resolver

Used when building Lambda GraphQL Resolvers with Amplify GraphQL Transform Library (@function), and AppSync Direct Lambda Resolvers.

The example serves as an AppSync resolver for the locations field of the Merchant type. It uses the @event_source decorator to parse the AppSync event, handles pagination and filtering for locations, and demonstrates AppSyncIdentityCognito.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import (
    AppSyncIdentityCognito,
    AppSyncResolverEvent,
)
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=AppSyncResolverEvent)
def lambda_handler(event: AppSyncResolverEvent, context: LambdaContext):
    # Access the AppSync event details
    type_name = event.type_name
    field_name = event.field_name
    arguments = event.arguments
    source = event.source

    print(f"Resolving field '{field_name}' for type '{type_name}'")
    print(f"Arguments: {arguments}")
    print(f"Source: {source}")

    # Check if the identity is Cognito-based
    if isinstance(event.identity, AppSyncIdentityCognito):
        user_id = event.identity.sub
        username = event.identity.username
        print(f"Request from Cognito user: {username} (ID: {user_id})")
    else:
        print("Request is not from a Cognito-authenticated user")

    if type_name == "Merchant" and field_name == "locations":
        page = arguments.get("page", 1)
        size = arguments.get("size", 10)
        name_filter = arguments.get("name")

        # Here you would typically fetch locations from a database
        # This is a mock implementation
        locations = [
            {"id": "1", "name": "Location 1", "address": "123 Main St"},
            {"id": "2", "name": "Location 2", "address": "456 Elm St"},
            {"id": "3", "name": "Location 3", "address": "789 Oak St"},
        ]

        # Apply name filter if provided
        if name_filter:
            locations = [loc for loc in locations if name_filter.lower() in loc["name"].lower()]

        # Apply pagination
        start = (page - 1) * size
        end = start + size
        paginated_locations = locations[start:end]

        return {
            "items": paginated_locations,
            "totalCount": len(locations),
            "nextToken": str(page + 1) if end < len(locations) else None,
        }
    else:
        raise Exception(f"Unhandled field: {field_name} for type: {type_name}")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
{
  "typeName": "Merchant",
  "fieldName": "locations",
  "arguments": {
    "page": 2,
    "size": 1,
    "name": "value"
  },
  "identity": {
    "claims": {
      "sub": "07920713-4526-4642-9c88-2953512de441",
      "iss": "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_POOL_ID",
      "aud": "58rc9bf5kkti90ctmvioppukm9",
      "event_id": "7f4c9383-abf6-48b7-b821-91643968b755",
      "token_use": "id",
      "auth_time": 1615366261,
      "name": "Michael Brewer",
      "exp": 1615369861,
      "iat": 1615366261
    },
    "defaultAuthStrategy": "ALLOW",
    "groups": null,
    "issuer": "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_POOL_ID",
    "sourceIp": [
      "11.215.2.22"
    ],
    "sub": "07920713-4526-4642-9c88-2953512de441",
    "username": "mike"
  },
  "source": {
    "name": "Value",
    "nested": {
      "name": "value",
      "list": []
    }
  },
  "request": {
    "headers": {
      "x-forwarded-for": "11.215.2.22, 64.44.173.11",
      "cloudfront-viewer-country": "US",
      "cloudfront-is-tablet-viewer": "false",
      "via": "2.0 SOMETHING.cloudfront.net (CloudFront)",
      "cloudfront-forwarded-proto": "https",
      "origin": "https://console.aws.amazon.com",
      "content-length": "156",
      "accept-language": "en-US,en;q=0.9",
      "host": "SOMETHING.appsync-api.us-east-1.amazonaws.com",
      "x-forwarded-proto": "https",
      "sec-gpc": "1",
      "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) etc.",
      "accept": "*/*",
      "cloudfront-is-mobile-viewer": "false",
      "cloudfront-is-smarttv-viewer": "false",
      "accept-encoding": "gzip, deflate, br",
      "referer": "https://console.aws.amazon.com/",
      "content-type": "application/json",
      "sec-fetch-mode": "cors",
      "x-amz-cf-id": "Fo5VIuvP6V6anIEt62WzFDCK45mzM4yEdpt5BYxOl9OFqafd-WR0cA==",
      "x-amzn-trace-id": "Root=1-60488877-0b0c4e6727ab2a1c545babd0",
      "authorization": "AUTH-HEADER",
      "sec-fetch-dest": "empty",
      "x-amz-user-agent": "AWS-Console-AppSync/",
      "cloudfront-is-desktop-viewer": "true",
      "sec-fetch-site": "cross-site",
      "x-forwarded-port": "443"
    }
  },
  "prev": {
    "result": {}
  }
}

AWS Config Rule

The example utilizes AWSConfigRuleEvent to parse the incoming event. The function logs the message type of the invoking event and returns a simple success response. The example event receives a Scheduled Event Notification, but could also be ItemChanged and Oversized.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import (
    AWSConfigRuleEvent,
    event_source,
)

logger = Logger()


@event_source(data_class=AWSConfigRuleEvent)
def lambda_handler(event: AWSConfigRuleEvent, context):
    message_type = event.invoking_event.message_type

    logger.info(f"Logging {message_type} event rule", invoke_event=event.raw_invoking_event)

    return {"Success": "OK"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
    "version":"1.0",
    "invokingEvent":"{\"awsAccountId\":\"0123456789012\",\"notificationCreationTime\":\"2023-04-27T13:26:17.741Z\",\"messageType\":\"ScheduledNotification\",\"recordVersion\":\"1.0\"}",
    "ruleParameters":"{\"test\":\"x\"}",
    "resultToken":"eyJlbmNyeXB0ZWREYXRhIjpbLTQyLDEyNiw1MiwtMzcsLTI5LDExNCwxMjYsLTk3LDcxLDIyLC0xMTAsMTEyLC0zMSwtOTMsLTQ5LC0xMDEsODIsMyw1NCw0OSwzLC02OSwtNzEsLTcyLDYyLDgxLC03MiwtODEsNTAsMzUsLTUwLC03NSwtMTE4LC0xMTgsNzcsMTIsLTEsMTQsMTIwLC03MCwxMTAsLTMsNTAsLTYwLDEwNSwtNTcsNDUsMTAyLC0xMDksLTYxLC0xMDEsLTYxLDQsNDcsLTg0LC0yNSwxMTIsNTQsLTcxLC0xMDksNDUsMTksMTIzLC0yNiwxMiwtOTYsLTczLDU0LC0xMDksOTIsNDgsLTU5LC04MywtMzIsODIsLTM2LC05MCwxOSw5OCw3Nyw3OCw0MCw4MCw3OCwtMTA1LDg3LC0xMTMsLTExNiwtNzIsMzAsLTY4LC00MCwtODksMTA5LC0xMDgsLTEyOCwyMiw3Miw3NywtMjEsNzYsODksOTQsLTU5LDgxLC0xMjEsLTEwNywtNjcsNjMsLTcsODIsLTg5LC00NiwtMzQsLTkyLDEyMiwtOTAsMTcsLTEyMywyMCwtODUsLTU5LC03MCw4MSwyNyw2Miw3NCwtODAsODAsMzcsNDAsMTE2LDkxLC0yNCw1MSwtNDEsLTc5LDI4LDEyMCw1MywtMTIyLC04MywxMjYsLTc4LDI1LC05OCwtMzYsMTMsMzIsODYsLTI1LDQ4LDMsLTEwMiwtMTYsMjQsLTMsODUsNDQsLTI4LDE0LDIyLDI3LC0xMjIsMTE4LDEwMSw3Myw1LDE4LDU4LC02NCwyMywtODYsLTExNCwyNCwwLDEwMCwyLDExNywtNjIsLTExOSwtMTI4LDE4LDY1LDkwLDE0LC0xMDIsMjEsODUsMTAwLDExNyw1NSwyOSwxMjcsNTQsNzcsNzIsNzQsMzIsNzgsMywtMTExLDExOCwtNzAsLTg2LDEyNywtNzQsNjAsMjIsNDgsMzcsODcsMTMsMCwtMTA1LDUsLTEyMiwtNzEsLTEwMCwxMDQsLTEyNiwtMTYsNzksLTMwLDEyMCw3NywtNzYsLTQxLC0xMDksMiw5NywtMTAxLC0xLDE1LDEyMywxMTksMTA4LDkxLC0yMCwtMTI1LC05NiwyLC05MiwtMTUsLTY3LC03NiwxMjEsMTA0LDEwNSw2NCwtNjIsMTAyLDgsNCwxMjEsLTQ1LC04MCwtODEsLTgsMTE4LDQ0LC04MiwtNDEsLTg0LDczLC0zNiwxMTcsODAsLTY5LC03MywxNCwtMTgsNzIsMzEsLTUsLTExMSwtMTI3LC00MywzNCwtOCw1NywxMDMsLTQyLDE4LC0zMywxMTcsLTI2LC0xMjQsLTEyNCwxNSw4OCwyMywxNiwtNTcsNTQsLTYsLTEwMiwxMTYsLTk5LC00NSwxMDAsLTM1LDg3LDM3LDYsOTgsMiwxMTIsNjAsLTMzLDE3LDI2LDk5LC0xMDUsNDgsLTEwNCwtMTE5LDc4LDYsLTU4LDk1LDksNDEsLTE2LDk2LDQxLC0yMiw5Niw3MiwxMTYsLTk1LC0xMDUsLTM2LC0xMjMsLTU1LDkxLC00NiwtNywtOTIsMzksNDUsODQsMTYsLTEyNCwtMTIyLC02OCwxLC0yOCwxMjIsLTYwLDgyLDEwMywtNTQsLTkyLDI3LC05OSwtMTI4LDY1LDcsLTcyLC0xMjcsNjIsLTIyLDIsLTExLDE4LC04OSwtMTA2LC03NCw3MSw4NiwtMTE2LC0yNSwtMTE1LC05Niw1NywtMzQsMjIsLTEyNCwtMTI1LC00LC00MSw0MiwtNTcsLTEwMyw0NSw3OCwxNCwtMTA2LDExMSw5OCwtOTQsLTcxLDUsNzUsMTksLTEyNCwtMzAsMzQsLTUwLDc1LC04NCwtNTAsLTU2LDUxLC0xNSwtMzYsNjEsLTk0LC03OSwtNDUsMTI2LC03NywtMTA1LC0yLC05MywtNiw4LC0zLDYsLTQyLDQ2LDEyNSw1LC05OCwxMyw2NywtMTAsLTEzLC05NCwtNzgsLTEyNywxMjEsLTI2LC04LC0xMDEsLTkxLDEyMSwtNDAsLTEyNCwtNjQsODQsLTcyLDYzLDE5LC04NF0sIm1hdGVyaWFsU2V0U2VyaWFsTnVtYmVyIjoxLCJpdlBhcmFtZXRlclNwZWMiOnsiaXYiOlszLC0xMCwtODUsMTE0LC05MCwxMTUsNzcsNTUsNTQsMTUsMzgsODQsLTExNiwxNCwtNDAsMjhdfX0=",
    "eventLeftScope":false,
    "executionRoleArn":"arn:aws:iam::0123456789012:role/aws-service-role/config.amazonaws.com/AWSServiceRoleForConfig",
    "configRuleArn":"arn:aws:config:us-east-1:0123456789012:config-rule/config-rule-pdmyw1",
    "configRuleName":"rule-ec2-test",
    "configRuleId":"config-rule-pdmyw1",
    "accountId":"0123456789012",
    "evaluationMode":"DETECTIVE"
 }

Bedrock Agent

The example handles Bedrock Agent event with BedrockAgentEvent to parse the incoming event. The function logs the action group and input text, then returns a structured response compatible with Bedrock Agent's expected format, including a mock response body.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import BedrockAgentEvent, event_source

logger = Logger()


@event_source(data_class=BedrockAgentEvent)
def lambda_handler(event: BedrockAgentEvent, context) -> dict:
    input_text = event.input_text

    logger.info(f"Bedrock Agent {event.action_group} invoked with input", input_text=input_text)

    return {
        "message_version": "1.0",
        "responses": [
            {
                "action_group": event.action_group,
                "api_path": event.api_path,
                "http_method": event.http_method,
                "http_status_code": 200,
                "response_body": {"application/json": {"body": "This is the response"}},
            },
        ],
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
  "actionGroup": "ClaimManagementActionGroup",
  "messageVersion": "1.0",
  "sessionId": "12345678912345",
  "sessionAttributes": {},
  "promptSessionAttributes": {},
  "inputText": "I want to claim my insurance",
  "agent": {
    "alias": "TSTALIASID",
    "name": "test",
    "version": "DRAFT",
    "id": "8ZXY0W8P1H"
  },
  "httpMethod": "GET",
  "apiPath": "/claims"
}

CloudFormation Custom Resource

The example focuses on the Create request type, generating a unique physical resource ID and logging the process. The function is structured to potentially handle Update and Delete operations as well.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import (
    CloudFormationCustomResourceEvent,
    event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=CloudFormationCustomResourceEvent)
def lambda_handler(event: CloudFormationCustomResourceEvent, context: LambdaContext):
    request_type = event.request_type

    if request_type == "Create":
        return on_create(event, context)
    else:
        raise ValueError(f"Invalid request type: {request_type}")


def on_create(event: CloudFormationCustomResourceEvent, context: LambdaContext):
    props = event.resource_properties
    logger.info(f"Create new resource with props {props}.")

    physical_id = f"MyResource-{context.aws_request_id}"

    return {"PhysicalResourceId": physical_id, "Data": {"Message": "Resource created successfully"}}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
  "RequestType": "Create",
  "ServiceToken": "arn:aws:lambda:us-east-1:xxx:function:xxxx-CrbuiltinfunctionidProvi-2vKAalSppmKe",
  "ResponseURL": "https://cloudformation-custom-resource-response-useast1.s3.amazonaws.com/7F%7Cb1f50fdfc25f3b",
  "StackId": "arn:aws:cloudformation:us-east-1:xxxx:stack/xxxx/271845b0-f2e8-11ed-90ac-0eeb25b8ae21",
  "RequestId": "xxxxx-d2a0-4dfb-ab1f-xxxxxx",
  "LogicalResourceId": "xxxxxxxxx",
  "ResourceType": "Custom::MyType",
  "ResourceProperties": {
    "ServiceToken": "arn:aws:lambda:us-east-1:xxxxx:function:xxxxx",
    "MyProps": "ss"
  }
}

CloudWatch Dashboard Custom Widget

Thie example for CloudWatchDashboardCustomWidgetEvent logs the dashboard name, extracts key information like widget ID and time range, and returns a formatted response with a title and markdown content. Read more about custom widgets for Cloudwatch dashboard.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import CloudWatchDashboardCustomWidgetEvent, event_source

logger = Logger()


@event_source(data_class=CloudWatchDashboardCustomWidgetEvent)
def lambda_handler(event: CloudWatchDashboardCustomWidgetEvent, context):
    if event.widget_context is None:
        logger.warning("No widget context provided")
        return {"title": "Error", "markdown": "Widget context is missing"}

    logger.info(f"Processing custom widget for dashboard: {event.widget_context.dashboard_name}")

    # Access specific event properties
    widget_id = event.widget_context.widget_id
    time_range = event.widget_context.time_range

    if time_range is None:
        logger.warning("No time range provided")
        return {"title": f"Custom Widget {widget_id}", "markdown": "Time range is missing"}

    # Your custom widget logic here
    return {
        "title": f"Custom Widget {widget_id}",
        "markdown": f"""
        Dashboard: {event.widget_context.dashboard_name}
        Time Range: {time_range.start} to {time_range.end}
        Theme: {event.widget_context.theme or 'default'}
        """,
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
  "original": "param-to-widget",
  "widgetContext": {
    "dashboardName": "Name-of-current-dashboard",
    "widgetId": "widget-16",
    "domain": "https://us-east-1.console.aws.amazon.com",
    "accountId": "123456789123",
    "locale": "en",
    "timezone": {
      "label": "UTC",
      "offsetISO": "+00:00",
      "offsetInMinutes": 0
    },
    "period": 300,
    "isAutoPeriod": true,
    "timeRange": {
      "mode": "relative",
      "start": 1627236199729,
      "end": 1627322599729,
      "relativeStart": 86400012,
      "zoom": {
        "start": 1627276030434,
        "end": 1627282956521
      }
    },
    "theme": "light",
    "linkCharts": true,
    "title": "Tweets for Amazon website problem",
    "forms": {
      "all": {}
    },
    "params": {
      "original": "param-to-widget"
    },
    "width": 588,
    "height": 369
  }
}

CloudWatch Alarm State Change Action

CloudWatch supports Lambda as an alarm state change action. You can use the CloudWathAlarmEvent data class to access the fields containing such data as alarm information, current state, and previous state.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import CloudWatchAlarmEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=CloudWatchAlarmEvent)
def lambda_handler(event: CloudWatchAlarmEvent, context: LambdaContext) -> dict:
    logger.info(f"Alarm {event.alarm_data.alarm_name} state is {event.alarm_data.state.value}")

    # You can now work with event. For example, you can enrich the received data, and
    # decide on how you want to route the alarm.

    return {
        "name": event.alarm_data.alarm_name,
        "arn": event.alarm_arn,
        "urgent": "Priority: P1" in (event.alarm_data.configuration.description or ""),
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
{
  "source": "aws.cloudwatch",
  "alarmArn": "arn:aws:cloudwatch:eu-west-1:912397435824:alarm:test_alarm",
  "accountId": "123456789012",
  "time": "2024-02-17T11:53:08.431+0000",
  "region": "eu-west-1",
  "alarmData": {
    "alarmName": "Test alert",
    "state": {
      "value": "ALARM",
      "reason": "Threshold Crossed: 1 out of the last 1 datapoints [1.0 (17/02/24 11:51:00)] was less than the threshold (10.0) (minimum 1 datapoint for OK -> ALARM transition).",
      "reasonData": "{\"version\":\"1.0\",\"queryDate\":\"2024-02-17T11:53:08.423+0000\",\"startDate\":\"2024-02-17T11:51:00.000+0000\",\"statistic\":\"SampleCount\",\"period\":60,\"recentDatapoints\":[1.0],\"threshold\":10.0,\"evaluatedDatapoints\":[{\"timestamp\":\"2024-02-17T11:51:00.000+0000\",\"sampleCount\":1.0,\"value\":1.0}]}",
      "timestamp": "2024-02-17T11:53:08.431+0000"
    },
    "previousState": {
      "value": "OK",
      "reason": "Threshold Crossed: 1 out of the last 1 datapoints [1.0 (17/02/24 11:50:00)] was not greater than the threshold (10.0) (minimum 1 datapoint for ALARM -> OK transition).",
      "reasonData": "{\"version\":\"1.0\",\"queryDate\":\"2024-02-17T11:51:31.460+0000\",\"startDate\":\"2024-02-17T11:50:00.000+0000\",\"statistic\":\"SampleCount\",\"period\":60,\"recentDatapoints\":[1.0],\"threshold\":10.0,\"evaluatedDatapoints\":[{\"timestamp\":\"2024-02-17T11:50:00.000+0000\",\"sampleCount\":1.0,\"value\":1.0}]}",
      "timestamp": "2024-02-17T11:51:31.462+0000"
    },
    "configuration": {
      "description": "This is description **here**",
      "metrics": [
        {
          "id": "e1",
          "expression": "m1/m2",
          "label": "Expression1",
          "returnData": true
        },
        {
          "id": "m1",
          "metricStat": {
            "metric": {
              "namespace": "AWS/Lambda",
              "name": "Invocations",
              "dimensions": {}
            },
            "period": 60,
            "stat": "SampleCount"
          },
          "returnData": false
        },
        {
          "id": "m2",
          "metricStat": {
            "metric": {
              "namespace": "AWS/Lambda",
              "name": "Duration",
              "dimensions": {}
            },
            "period": 60,
            "stat": "SampleCount"
          },
          "returnData": false
        }
      ]
    }
  }
}

CloudWatch Logs

CloudWatch Logs events by default are compressed and base64 encoded. You can use the helper function provided to decode, decompress and parse json data from the event.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import CloudWatchLogsEvent, event_source
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData

logger = Logger()


@event_source(data_class=CloudWatchLogsEvent)
def lambda_handler(event: CloudWatchLogsEvent, context):
    decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data()

    logger.info(f"Log group: {decompressed_log.log_group}")
    logger.info(f"Log stream: {decompressed_log.log_stream}")

    for log_event in decompressed_log.log_events:
        logger.info(f"Timestamp: {log_event.timestamp}, Message: {log_event.message}")

    return {"statusCode": 200, "body": f"Processed {len(decompressed_log.log_events)} log events"}
1
2
3
4
5
{
  "awslogs": {
    "data": "H4sIAAAAAAAAAHWPwQqCQBCGX0Xm7EFtK+smZBEUgXoLCdMhFtKV3akI8d0bLYmibvPPN3wz00CJxmQnTO41whwWQRIctmEcB6sQbFC3CjW3XW8kxpOpP+OC22d1Wml1qZkQGtoMsScxaczKN3plG8zlaHIta5KqWsozoTYw3/djzwhpLwivWFGHGpAFe7DL68JlBUk+l7KSN7tCOEJ4M3/qOI49vMHj+zCKdlFqLaU2ZHV2a4Ct/an0/ivdX8oYc1UVX860fQDQiMdxRQEAAA=="
  }
}

Kinesis integration

When streaming CloudWatch Logs to a Kinesis Data Stream (cross-account or not), you can use extract_cloudwatch_logs_from_event to decode, decompress and extract logs as CloudWatchLogsDecodedData to ease log processing.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from typing import List

from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
    KinesisStreamEvent,
    extract_cloudwatch_logs_from_event,
)


@event_source(data_class=KinesisStreamEvent)
def lambda_handler(event: KinesisStreamEvent, context):
    logs: List[CloudWatchLogsDecodedData] = extract_cloudwatch_logs_from_event(event)
    for log in logs:
        if log.message_type == "DATA_MESSAGE":
            return "success"
    return "nothing to be processed"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "da10bf66b1f54bff5d96eae99149ad1f",
                "sequenceNumber": "49635052289529725553291405521504870233219489715332317186",
                "data": "H4sIAAAAAAAAAK2Sa2vbMBSG/4ox+xg3Oror39IlvaztVmJv7WjCUGwl8+ZLZstts5L/vuOsZYUyWGEgJHiP9J7nvOghLF3b2rVLthsXjsLJOBl/uZjG8fh4Gg7C+q5yDcqUAWcSONHEoFzU6+Om7jZYGdq7dljYcpnZ4cZHwLWOJl1Zbs/r9cR6e9RVqc/rKlpXV9eXt+fy27vt8W+L2DfOlr07oXQIMAQyvHlzPk6mcbKgciktF5lQfMU5dZZqzrShLF2uFC60aLtlmzb5prc/ygvvmjYc3YRPFG+LusuurE+/Ikqb1Gd55dq8jV+8isT6+317Rk42J5PTcLFnm966yvd2D2GeISJTYIwCJSQ1BE9OtWZCABWaKMIJAMdDMyU5MYZLhmkxBhQxfY4Re1tiWiAlBsgIVQTE4Cl6tI+T8SwJZu5Hh1dPs1FApOMSDI9WVKmIC+4irTMWQZYpx7QkztrgE06MU4yCx9DmVbgbvABmQJTGtkYAB0NwEwyYQUBpqEFuSbkGrThTRKi/AlP+HHj6fvJa3P9Ap/+Rbja9/PD6POd+0jXW7xM1B8CDsp37w7woXBb8qQDZ6xeurJttEOc/HWpUBxeHKNr74LHwsXXYlsm9flrl/rmFIQeS7m3m1fVs/DlIGpu6nhMiyWQGXNKIMbcCIgkhElKbaZnZpYJUz33s1iV+z/6+StMlR3yphHNcCyxiNEXf2zed6xuEu8XuF2wb6krnAwAA",
                "approximateArrivalTimestamp": 1668093033.744
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49635052289529725553291405521504870233219489715332317186",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49",
            "awsRegion": "eu-west-1",
            "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "cf4c4c2c9a49bdfaf58d7dbbc2b06081",
                "sequenceNumber": "49635052289529725553291405520881064510298312199003701250",
                "data": "H4sIAAAAAAAAAK2SW2/TQBCF/4pl8ViTvc7u5i0laVraQhUbWtREaG1PgsGXYK/bhqr/nXVoBRIgUYnXc2bPfHO092GFXWc3mOy2GI7D6SSZfDyfxfFkPgsPwua2xtbLjFPBgQqiifFy2WzmbdNvvTOyt92otFWa29HWRVRoHU37qtqdNZupdfaorzNXNHW0qS+vLm7O4PPr3fxHROxatNWQThgbUTqiZHT94mySzOJkBUqYLOWY8ZQLbaTRkEvDciUYzWzKfETXp13WFtsh/qgoHbZdOL4OnyhelU2fX1qXffIoXdKcFjV2RRf/9iqSmy933Sk53h5PT8LVnm12g7Ub4u7DIveIXFFjFNGUKUlAaMY0EUJKLjkQbxhKGCWeknMKoAGUkYoJ7TFd4St2tvJtDRYxDAg3VB08Ve/j42SySIIFfu396Ek+DkS+xkwAiYhM00isgUV6jXmEMrM5EmMsh+C9v9hfMQ4eS1vW4cPBH4CZVpoTJkEIAp5RUMo8vGFae3JNCCdUccMVgPw7sP4VePZm+lzc/0AH/0i3mF28fX6fSzftW+v2jZKXRgVVt3SHRVliHvx06F4+x6ppd0FcfEMvMR2cH3rR3gWPxrsO/Vau9vqyvlpMPgRJazMcYGgEHHLKBhLGJaBA0JLxNc0JppoS9Cwxbir/B4d5QDBAQSnfFFGp8aa/vxw2uLbHYUH4sHr4Dj5RJxfMAwAA",
                "approximateArrivalTimestamp": 1668092612.992
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49635052289529725553291405520881064510298312199003701250",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49",
            "awsRegion": "eu-west-1",
            "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG"
        }
    ]
}

Alternatively, you can use extract_cloudwatch_logs_from_record to seamless integrate with the Batch utility for more robust log processing.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import (
    BatchProcessor,
    EventType,
    process_partial_response,
)
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
    KinesisStreamRecord,
    extract_cloudwatch_logs_from_record,
)

logger = Logger()

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)


def record_handler(record: KinesisStreamRecord):
    log = extract_cloudwatch_logs_from_record(record)
    logger.info(f"Message type: {log.message_type}")
    return log.message_type == "DATA_MESSAGE"


def lambda_handler(event, context):
    return process_partial_response(
        event=event,
        record_handler=record_handler,
        processor=processor,
        context=context,
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "da10bf66b1f54bff5d96eae99149ad1f",
                "sequenceNumber": "49635052289529725553291405521504870233219489715332317186",
                "data": "H4sIAAAAAAAAAK2Sa2vbMBSG/4ox+xg3Oror39IlvaztVmJv7WjCUGwl8+ZLZstts5L/vuOsZYUyWGEgJHiP9J7nvOghLF3b2rVLthsXjsLJOBl/uZjG8fh4Gg7C+q5yDcqUAWcSONHEoFzU6+Om7jZYGdq7dljYcpnZ4cZHwLWOJl1Zbs/r9cR6e9RVqc/rKlpXV9eXt+fy27vt8W+L2DfOlr07oXQIMAQyvHlzPk6mcbKgciktF5lQfMU5dZZqzrShLF2uFC60aLtlmzb5prc/ygvvmjYc3YRPFG+LusuurE+/Ikqb1Gd55dq8jV+8isT6+317Rk42J5PTcLFnm966yvd2D2GeISJTYIwCJSQ1BE9OtWZCABWaKMIJAMdDMyU5MYZLhmkxBhQxfY4Re1tiWiAlBsgIVQTE4Cl6tI+T8SwJZu5Hh1dPs1FApOMSDI9WVKmIC+4irTMWQZYpx7QkztrgE06MU4yCx9DmVbgbvABmQJTGtkYAB0NwEwyYQUBpqEFuSbkGrThTRKi/AlP+HHj6fvJa3P9Ap/+Rbja9/PD6POd+0jXW7xM1B8CDsp37w7woXBb8qQDZ6xeurJttEOc/HWpUBxeHKNr74LHwsXXYlsm9flrl/rmFIQeS7m3m1fVs/DlIGpu6nhMiyWQGXNKIMbcCIgkhElKbaZnZpYJUz33s1iV+z/6+StMlR3yphHNcCyxiNEXf2zed6xuEu8XuF2wb6krnAwAA",
                "approximateArrivalTimestamp": 1668093033.744
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49635052289529725553291405521504870233219489715332317186",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49",
            "awsRegion": "eu-west-1",
            "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "cf4c4c2c9a49bdfaf58d7dbbc2b06081",
                "sequenceNumber": "49635052289529725553291405520881064510298312199003701250",
                "data": "H4sIAAAAAAAAAK2SW2/TQBCF/4pl8ViTvc7u5i0laVraQhUbWtREaG1PgsGXYK/bhqr/nXVoBRIgUYnXc2bPfHO092GFXWc3mOy2GI7D6SSZfDyfxfFkPgsPwua2xtbLjFPBgQqiifFy2WzmbdNvvTOyt92otFWa29HWRVRoHU37qtqdNZupdfaorzNXNHW0qS+vLm7O4PPr3fxHROxatNWQThgbUTqiZHT94mySzOJkBUqYLOWY8ZQLbaTRkEvDciUYzWzKfETXp13WFtsh/qgoHbZdOL4OnyhelU2fX1qXffIoXdKcFjV2RRf/9iqSmy933Sk53h5PT8LVnm12g7Ub4u7DIveIXFFjFNGUKUlAaMY0EUJKLjkQbxhKGCWeknMKoAGUkYoJ7TFd4St2tvJtDRYxDAg3VB08Ve/j42SySIIFfu396Ek+DkS+xkwAiYhM00isgUV6jXmEMrM5EmMsh+C9v9hfMQ4eS1vW4cPBH4CZVpoTJkEIAp5RUMo8vGFae3JNCCdUccMVgPw7sP4VePZm+lzc/0AH/0i3mF28fX6fSzftW+v2jZKXRgVVt3SHRVliHvx06F4+x6ppd0FcfEMvMR2cH3rR3gWPxrsO/Vau9vqyvlpMPgRJazMcYGgEHHLKBhLGJaBA0JLxNc0JppoS9Cwxbir/B4d5QDBAQSnfFFGp8aa/vxw2uLbHYUH4sHr4Dj5RJxfMAwAA",
                "approximateArrivalTimestamp": 1668092612.992
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49635052289529725553291405520881064510298312199003701250",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49",
            "awsRegion": "eu-west-1",
            "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG"
        }
    ]
}

CodeDeploy LifeCycle Hook

CodeDeploy triggers Lambdas with this event when defined in AppSpec definitions to test applications at different stages of deployment.

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import CodeDeployLifecycleHookEvent, event_source


@event_source(data_class=CodeDeployLifecycleHookEvent)
def lambda_handler(event: CodeDeployLifecycleHookEvent, context):
    deployment_id = event.deployment_id
    lifecycle_event_hook_execution_id = event.lifecycle_event_hook_execution_id

    return {"deployment_id": deployment_id, "lifecycle_event_hook_execution_id": lifecycle_event_hook_execution_id}
1
2
3
4
{
    "DeploymentId": "d-ABCDEF",
    "LifecycleEventHookExecutionId": "xxxxxxxxxxxxxxxxxxxxxxxx"
}

CodePipeline Job

Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from aws_lambda_powertools.utilities.data_classes import CodePipelineJobEvent, event_source


@event_source(data_class=CodePipelineJobEvent)
def lambda_handler(event: CodePipelineJobEvent, context):
    job_id = event.get_id

    input_bucket = event.input_bucket_name

    return {"statusCode": 200, "body": f"Processed job {job_id} from bucket {input_bucket}"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
    "CodePipeline.job": {
        "id": "11111111-abcd-1111-abcd-111111abcdef",
        "accountId": "111111111111",
        "data": {
            "actionConfiguration": {
                "configuration": {
                    "FunctionName": "MyLambdaFunctionForAWSCodePipeline",
                    "UserParameters": "some-input-such-as-a-URL"
                }
            },
            "inputArtifacts": [
                {
                    "name": "ArtifactName",
                    "revision": null,
                    "location": {
                        "type": "S3",
                        "s3Location": {
                            "bucketName": "the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890",
                            "objectKey": "the name of the application, for example CodePipelineDemoApplication.zip"
                        }
                    }
                }
            ],
            "outputArtifacts": [],
            "artifactCredentials": {
                "accessKeyId": "AKIAIOSFODNN7EXAMPLE",
                "secretAccessKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
                "sessionToken": "MIICiTCCAfICCQD6m7oRw0uXOjANBgkqhkiG9w0BAQUFADCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wHhcNMTEwNDI1MjA0NTIxWhcNMTIwNDI0MjA0NTIxWjCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMaK0dn+a4GmWIWJ21uUSfwfEvySWtC2XADZ4nB+BLYgVIk60CpiwsZ3G93vUEIO3IyNoH/f0wYK8m9TrDHudUZg3qX4waLG5M43q7Wgc/MbQITxOUSQv7c7ugFFDzQGBzZswY6786m86gpEIbb3OhjZnzcvQAaRHhdlQWIMm2nrAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAtCu4nUhVVxYUntneD9+h8Mg9q6q+auNKyExzyLwaxlAoo7TJHidbtS4J5iNmZgXL0FkbFFBjvSfpJIlJ00zbhNYS5f6GuoEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTbNYiytVbZPQUQ5Yaxu2jXnimvw3rrszlaEXAMPLE="
            },
            "continuationToken": "A continuation token if continuing job"
        }
    }
}

Cognito User Pool

Cognito User Pools have several different Lambda trigger sources, all of which map to a different data class, which can be imported from aws_lambda_powertools.data_classes.cognito_user_pool_event:

Trigger/Event Source Data Class
Custom message event data_classes.cognito_user_pool_event.CustomMessageTriggerEvent
Post authentication data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent
Post confirmation data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent
Pre authentication data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent
Pre sign-up data_classes.cognito_user_pool_event.PreSignUpTriggerEvent
Pre token generation data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent
Pre token generation V2 data_classes.cognito_user_pool_event.PreTokenGenerationV2TriggerEvent
User migration data_classes.cognito_user_pool_event.UserMigrationTriggerEvent
Define Auth Challenge data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent
Create Auth Challenge data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent
Verify Auth Challenge data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent
Custom Email Sender data_classes.cognito_user_pool_event.CustomEmailSenderTriggerEvent
Custom SMS Sender data_classes.cognito_user_pool_event.CustomSMSSenderTriggerEvent

Some examples for the Cognito User Pools Lambda triggers sources:

Post Confirmation Example

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import PostConfirmationTriggerEvent


def lambda_handler(event, context):
    event: PostConfirmationTriggerEvent = PostConfirmationTriggerEvent(event)

    user_attributes = event.request.user_attributes

    return {"statusCode": 200, "body": f"User attributes: {user_attributes}"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
  "version": "string",
  "triggerSource": "PostConfirmation_ConfirmSignUp",
  "region": "us-east-1",
  "userPoolId": "string",
  "userName": "userName",
  "callerContext": {
    "awsSdkVersion": "awsSdkVersion",
    "clientId": "clientId"
  },
  "request": {
    "userAttributes": {
      "email": "user@example.com",
      "email_verified": true
    }
  },
  "response": {}
}

Define Auth Challenge Example

Note

In this example we are modifying the wrapped dict response fields, so we need to return the json serializable wrapped event in event.raw_event.

This example is based on the AWS Cognito docs for Define Auth Challenge Lambda Trigger.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import DefineAuthChallengeTriggerEvent


def lambda_handler(event, context) -> dict:
    event_obj: DefineAuthChallengeTriggerEvent = DefineAuthChallengeTriggerEvent(event)

    if len(event_obj.request.session) == 1 and event_obj.request.session[0].challenge_name == "SRP_A":
        event_obj.response.issue_tokens = False
        event_obj.response.fail_authentication = False
        event_obj.response.challenge_name = "PASSWORD_VERIFIER"
    elif (
        len(event_obj.request.session) == 2
        and event_obj.request.session[1].challenge_name == "PASSWORD_VERIFIER"
        and event_obj.request.session[1].challenge_result
    ):
        event_obj.response.issue_tokens = False
        event_obj.response.fail_authentication = False
        event_obj.response.challenge_name = "CUSTOM_CHALLENGE"
    elif (
        len(event_obj.request.session) == 3
        and event_obj.request.session[2].challenge_name == "CUSTOM_CHALLENGE"
        and event_obj.request.session[2].challenge_result
    ):
        event_obj.response.issue_tokens = True
        event_obj.response.fail_authentication = False
    else:
        event_obj.response.issue_tokens = False
        event_obj.response.fail_authentication = True

    return event_obj.raw_event
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
  "version": "1",
  "region": "us-east-1",
  "userPoolId": "us-east-1_example",
  "userName": "UserName",
  "callerContext": {
    "awsSdkVersion": "awsSdkVersion",
    "clientId": "clientId"
  },
  "triggerSource": "DefineAuthChallenge_Authentication",
  "request": {
    "userAttributes": {
      "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
      "email_verified": "true",
      "name": "First Last",
      "email": "define-auth@mail.com"
    },
    "session" : [
      {
        "challengeName": "PASSWORD_VERIFIER",
        "challengeResult": true
      },
      {
        "challengeName": "CUSTOM_CHALLENGE",
        "challengeResult": true,
        "challengeMetadata": "CAPTCHA_CHALLENGE"
      }
    ],
    "userNotFound": true
  },
  "response": {}
}

Create Auth Challenge Example

This example is based on the AWS Cognito docs for Create Auth Challenge Lambda Trigger.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import CreateAuthChallengeTriggerEvent


@event_source(data_class=CreateAuthChallengeTriggerEvent)
def handler(event: CreateAuthChallengeTriggerEvent, context) -> dict:
    if event.request.challenge_name == "CUSTOM_CHALLENGE":
        event.response.public_challenge_parameters = {"captchaUrl": "url/123.jpg"}
        event.response.private_challenge_parameters = {"answer": "5"}
        event.response.challenge_metadata = "CAPTCHA_CHALLENGE"
    return event.raw_event
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
  "version": "1",
  "region": "us-east-1",
  "userPoolId": "us-east-1_example",
  "userName": "UserName",
  "callerContext": {
    "awsSdkVersion": "awsSdkVersion",
    "clientId": "clientId"
  },
  "triggerSource": "CreateAuthChallenge_Authentication",
  "request": {
    "userAttributes": {
      "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
      "email_verified": "true",
      "name": "First Last",
      "email": "create-auth@mail.com"
    },
    "challengeName": "PASSWORD_VERIFIER",
    "session" : [
      {
        "challengeName": "CUSTOM_CHALLENGE",
        "challengeResult": true,
        "challengeMetadata": "CAPTCHA_CHALLENGE"
      }
    ],
    "userNotFound": false
  },
  "response": {}
}

Verify Auth Challenge Response Example

This example is based on the AWS Cognito docs for Verify Auth Challenge Response Lambda Trigger.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import VerifyAuthChallengeResponseTriggerEvent


@event_source(data_class=VerifyAuthChallengeResponseTriggerEvent)
def lambda_handler(event: VerifyAuthChallengeResponseTriggerEvent, context) -> dict:
    event.response.answer_correct = (
        event.request.private_challenge_parameters.get("answer") == event.request.challenge_answer
    )
    return event.raw_event
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
  "version": "1",
  "region": "us-east-1",
  "userPoolId": "us-east-1_example",
  "userName": "UserName",
  "callerContext": {
    "awsSdkVersion": "awsSdkVersion",
    "clientId": "clientId"
  },
  "triggerSource": "VerifyAuthChallengeResponse_Authentication",
  "request": {
    "userAttributes": {
      "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
      "email_verified": "true",
      "name": "First Last",
      "email": "verify-auth@mail.com"
    },
    "privateChallengeParameters": {
      "answer": "challengeAnswer"
    },
    "clientMetadata" : {
      "foo": "value"
    },
    "challengeAnswer": "challengeAnswer",
    "userNotFound": true
  },
  "response": {}
}

Connect Contact Flow

The example integrates with Amazon Connect by handling contact flow events. The function converts the event into a ConnectContactFlowEvent object, providing a structured representation of the contact flow data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event import (
    ConnectContactFlowChannel,
    ConnectContactFlowEndpointType,
    ConnectContactFlowEvent,
    ConnectContactFlowInitiationMethod,
)


def lambda_handler(event, context):
    event: ConnectContactFlowEvent = ConnectContactFlowEvent(event)
    assert event.contact_data.attributes == {"Language": "en-US"}
    assert event.contact_data.channel == ConnectContactFlowChannel.VOICE
    assert event.contact_data.customer_endpoint.endpoint_type == ConnectContactFlowEndpointType.TELEPHONE_NUMBER
    assert event.contact_data.initiation_method == ConnectContactFlowInitiationMethod.API
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
    "Name": "ContactFlowEvent",
    "Details": {
        "ContactData": {
            "Attributes": {
                "Language": "en-US"
            },
            "Channel": "VOICE",
            "ContactId": "5ca32fbd-8f92-46af-92a5-6b0f970f0efe",
            "CustomerEndpoint": {
                "Address": "+11234567890",
                "Type": "TELEPHONE_NUMBER"
            },
            "InitialContactId": "5ca32fbd-8f92-46af-92a5-6b0f970f0efe",
            "InitiationMethod": "API",
            "InstanceARN": "arn:aws:connect:eu-central-1:123456789012:instance/9308c2a1-9bc6-4cea-8290-6c0b4a6d38fa",
            "MediaStreams": {
                "Customer": {
                    "Audio": {
                        "StartFragmentNumber": "91343852333181432392682062622220590765191907586",
                        "StartTimestamp": "1565781909613",
                        "StreamARN": "arn:aws:kinesisvideo:eu-central-1:123456789012:stream/connect-contact-a3d73b84-ce0e-479a-a9dc-5637c9d30ac9/1565272947806"
                    }
                }
            },
            "PreviousContactId": "5ca32fbd-8f92-46af-92a5-6b0f970f0efe",
            "Queue": {
                "ARN": "arn:aws:connect:eu-central-1:123456789012:instance/9308c2a1-9bc6-4cea-8290-6c0b4a6d38fa/queue/5cba7cbf-1ecb-4b6d-b8bd-fe91079b3fc8",
                "Name": "QueueOne"
            },
            "SystemEndpoint": {
                "Address": "+11234567890",
                "Type": "TELEPHONE_NUMBER"
            }
        },
        "Parameters": {
            "ParameterOne": "One",
            "ParameterTwo": "Two"
        }
    }
}

DynamoDB Streams

The DynamoDB data class utility provides the base class for DynamoDBStreamEvent, as well as enums for stream view type (StreamViewType) and event type. (DynamoDBRecordEventName). The class automatically deserializes DynamoDB types into their equivalent Python types.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
    DynamoDBRecordEventName,
    DynamoDBStreamEvent,
)


def lambda_handler(event, context):
    event: DynamoDBStreamEvent = DynamoDBStreamEvent(event)

    # Multiple records can be delivered in a single event
    for record in event.records:
        if record.event_name == DynamoDBRecordEventName.MODIFY:
            pass
        elif record.event_name == DynamoDBRecordEventName.INSERT:
            pass
    return "success"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=DynamoDBStreamEvent)
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
    processed_keys = []
    for record in event.records:
        if record.dynamodb and record.dynamodb.keys and "Id" in record.dynamodb.keys:
            key = record.dynamodb.keys["Id"]
            processed_keys.append(key)

    return {"statusCode": 200, "body": f"Processed keys: {processed_keys}"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "ApproximateCreationDateTime": 1693997155.0,
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "eventsource_arn",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "source_arn",
      "eventSource": "aws:dynamodb"
    }
  ]
}

EventBridge

When an event matching a defined rule occurs in EventBridge, it can automatically trigger a Lambda function, passing the event data as input.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from aws_lambda_powertools.utilities.data_classes import EventBridgeEvent, event_source


@event_source(data_class=EventBridgeEvent)
def lambda_handler(event: EventBridgeEvent, context):
    detail_type = event.detail_type
    state = event.detail.get("state")

    # Do something

    return {"detail_type": detail_type, "state": state}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "resources": [
    "arn:aws:ec2:us-west-1:123456789012:instance/i-1234567890abcdef0"
  ],
  "detail": {
    "instance_id": "i-1234567890abcdef0",
    "state": "terminated"
  },
  "replay-name": "replay_archive"
}

Kafka

This example is based on the AWS docs for Amazon MSK and self-managed Apache Kafka.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from aws_lambda_powertools.utilities.data_classes import KafkaEvent, event_source


def do_something_with(key: str, value: str):
    print(f"key: {key}, value: {value}")


@event_source(data_class=KafkaEvent)
def lambda_handler(event: KafkaEvent, context):
    for record in event.records:
        do_something_with(record.topic, record.value)
    return "success"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
  "eventSource":"aws:kafka",
  "eventSourceArn":"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
  "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
  "records":{
     "mytopic-0":[
        {
           "topic":"mytopic",
           "partition":0,
           "offset":15,
           "timestamp":1545084650987,
           "timestampType":"CREATE_TIME",
           "key":"cmVjb3JkS2V5",
           "value":"eyJrZXkiOiJ2YWx1ZSJ9",
           "headers":[
              {
                 "headerKey":[
                    104,
                    101,
                    97,
                    100,
                    101,
                    114,
                    86,
                    97,
                    108,
                    117,
                    101
                 ]
              }
           ]
        }
     ]
  }
}

Kinesis streams

Kinesis events by default contain base64 encoded data. You can use the helper function to access the data either as json or plain text, depending on the original payload.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import json
from typing import Any, Dict, Union

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import KinesisStreamEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=KinesisStreamEvent)
def lambda_handler(event: KinesisStreamEvent, context: LambdaContext):
    for record in event.records:
        kinesis_record = record.kinesis

        payload: Union[Dict[str, Any], str]

        try:
            # Try to parse as JSON first
            payload = kinesis_record.data_as_json()
            logger.info("Received JSON data from Kinesis")
        except json.JSONDecodeError:
            # If JSON parsing fails, get as text
            payload = kinesis_record.data_as_text()
            logger.info("Received text data from Kinesis")

        process_data(payload)

    return {"statusCode": 200, "body": "Processed all records successfully"}


def process_data(data: Union[Dict[str, Any], str]) -> None:
    if isinstance(data, dict):
        # Handle JSON data
        logger.info(f"Processing JSON data: {data}")
        # Add your JSON processing logic here
    else:
        # Handle text data
        logger.info(f"Processing text data: {data}")
        # Add your text processing logic here
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
  "Records": [
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
        "approximateArrivalTimestamp": 1545084650.987
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
      "awsRegion": "us-east-2",
      "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
    },
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
        "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
        "approximateArrivalTimestamp": 1545084711.166
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
      "awsRegion": "us-east-2",
      "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
    }
  ]
}

Kinesis Firehose delivery stream

When using Kinesis Firehose, you can use a Lambda function to perform data transformation. For each transformed record, you can choose to either:

  • A) Put them back to the delivery stream (default)
  • B) Drop them so consumers don't receive them (e.g., data validation)
  • C) Indicate a record failed data transformation and should be retried

To do that, you can use KinesisFirehoseDataTransformationResponse class along with helper functions to make it easier to decode and encode base64 data in the stream.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from aws_lambda_powertools.utilities.data_classes import (
    KinesisFirehoseDataTransformationResponse,
    KinesisFirehoseEvent,
    event_source,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
    result = KinesisFirehoseDataTransformationResponse()

    for record in event.records:
        # get original data using data_as_text property
        data = record.data_as_text  # (1)!

        ## generate data to return
        transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data}

        processed_record = record.build_data_transformation_response(
            data=base64_from_json(transformed_data),  # (2)!
        )

        result.add_record(processed_record)

    # return transformed records
    return result.asdict()
  1. Ingesting JSON payloads?

    Use record.data_as_json to easily deserialize them.
  2. For your convenience, base64_from_json serializes a dict to JSON, then encode as base64 data.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from json import JSONDecodeError
from typing import Dict

from aws_lambda_powertools.utilities.data_classes import (
    KinesisFirehoseDataTransformationRecord,
    KinesisFirehoseDataTransformationResponse,
    KinesisFirehoseEvent,
    event_source,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
    result = KinesisFirehoseDataTransformationResponse()

    for record in event.records:
        try:
            payload: Dict = record.data_as_json  # decodes and deserialize base64 JSON string

            ## generate data to return
            transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}

            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=base64_from_json(transformed_data),
            )
        except JSONDecodeError:  # (1)!
            # our producers ingest JSON payloads only; drop malformed records from the stream
            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=record.data,
                result="Dropped",
            )

        result.add_record(processed_record)

    # return transformed records
    return result.asdict()
  1. This exception would be generated from record.data_as_json if invalid payload.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from aws_lambda_powertools.utilities.data_classes import (
    KinesisFirehoseDataTransformationRecord,
    KinesisFirehoseDataTransformationResponse,
    KinesisFirehoseEvent,
    event_source,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: dict, context: LambdaContext):
    firehose_event = KinesisFirehoseEvent(event)
    result = KinesisFirehoseDataTransformationResponse()

    for record in firehose_event.records:
        try:
            payload = record.data_as_text  # base64 decoded data as str

            # generate data to return
            transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}

            # Default result is Ok
            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=base64_from_json(transformed_data),
            )
        except Exception:
            # add Failed result to processing results, send back to kinesis for retry
            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=record.data,
                result="ProcessingFailed",  # (1)!
            )

        result.add_record(processed_record)

    # return transformed records
    return result.asdict()
  1. This record will now be sent to your S3 bucket in the processing-failed folder.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
    "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
    "sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source",
    "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
    "region": "us-east-2",
    "records": [
        {
            "data": "SGVsbG8gV29ybGQ=",
            "recordId": "record1",
            "approximateArrivalTimestamp": 1664028820148,
            "kinesisRecordMetadata": {
                "shardId": "shardId-000000000000",
                "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
                "approximateArrivalTimestamp": 1664028820148,
                "sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
                "subsequenceNumber": 0
            }
        },
        {
            "data": "eyJIZWxsbyI6ICJXb3JsZCJ9",
            "recordId": "record2",
            "approximateArrivalTimestamp": 1664028793294,
            "kinesisRecordMetadata": {
                "shardId": "shardId-000000000001",
                "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
                "approximateArrivalTimestamp": 1664028793294,
                "sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
                "subsequenceNumber": 0
            }
        }
    ]
}

Lambda Function URL

Lambda Function URLs provide a direct HTTP endpoint for invoking Lambda functions. This feature allows functions to receive and process HTTP requests without the need for additional services like API Gateway.

1
2
3
4
5
6
7
from aws_lambda_powertools.utilities.data_classes import LambdaFunctionUrlEvent, event_source


@event_source(data_class=LambdaFunctionUrlEvent)
def lambda_handler(event: LambdaFunctionUrlEvent, context):
    if event.request_context.http.method == "GET":
        return {"statusCode": 200, "body": "Hello World!"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
{
   "version":"2.0",
   "routeKey":"$default",
   "rawPath":"/",
   "rawQueryString":"",
   "headers":{
      "sec-fetch-mode":"navigate",
      "x-amzn-tls-version":"TLSv1.2",
      "sec-fetch-site":"cross-site",
      "accept-language":"pt-BR,pt;q=0.9",
      "x-forwarded-proto":"https",
      "x-forwarded-port":"443",
      "x-forwarded-for":"123.123.123.123",
      "sec-fetch-user":"?1",
      "accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
      "x-amzn-tls-cipher-suite":"ECDHE-RSA-AES128-GCM-SHA256",
      "sec-ch-ua":"\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"102\", \"Google Chrome\";v=\"102\"",
      "sec-ch-ua-mobile":"?0",
      "x-amzn-trace-id":"Root=1-62ecd163-5f302e550dcde3b12402207d",
      "sec-ch-ua-platform":"\"Linux\"",
      "host":"<url-id>.lambda-url.us-east-1.on.aws",
      "upgrade-insecure-requests":"1",
      "cache-control":"max-age=0",
      "accept-encoding":"gzip, deflate, br",
      "sec-fetch-dest":"document",
      "user-agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
   },
   "requestContext":{
      "accountId":"anonymous",
      "apiId":"<url-id>",
      "domainName":"<url-id>.lambda-url.us-east-1.on.aws",
      "domainPrefix":"<url-id>",
      "http":{
         "method":"GET",
         "path":"/",
         "protocol":"HTTP/1.1",
         "sourceIp":"123.123.123.123",
         "userAgent":"agent"
      },
      "requestId":"id",
      "routeKey":"$default",
      "stage":"$default",
      "time":"05/Aug/2022:08:14:39 +0000",
      "timeEpoch":1659687279885
   },
   "isBase64Encoded":false
}

Rabbit MQ

It is used for Rabbit MQ payloads. See the blog post for more details.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from typing import Dict

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent

logger = Logger()


@event_source(data_class=RabbitMQEvent)
def lambda_handler(event: RabbitMQEvent, context):
    for queue_name, messages in event.rmq_messages_by_queue.items():
        logger.debug(f"Messages for queue: {queue_name}")
        for message in messages:
            logger.debug(f"MessageID: {message.basic_properties.message_id}")
            data: Dict = message.json_data
            logger.debug(f"Process json in base64 encoded data str {data}")
    return {
        "queue_name": queue_name,
        "message_id": message.basic_properties.message_id,
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
{
  "eventSource": "aws:rmq",
  "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8",
  "rmqMessagesByQueue": {
    "pizzaQueue::/": [
      {
        "basicProperties": {
          "contentType": "text/plain",
          "contentEncoding": null,
          "headers": {
            "header1": {
              "bytes": [
                118,
                97,
                108,
                117,
                101,
                49
              ]
            },
            "header2": {
              "bytes": [
                118,
                97,
                108,
                117,
                101,
                50
              ]
            },
            "numberInHeader": 10
          },
          "deliveryMode": 1,
          "priority": 34,
          "correlationId": null,
          "replyTo": null,
          "expiration": "60000",
          "messageId": null,
          "timestamp": "Jan 1, 1970, 12:33:41 AM",
          "type": null,
          "userId": "AIDACKCEVSQ6C2EXAMPLE",
          "appId": null,
          "clusterId": null,
          "bodySize": 80
        },
        "redelivered": false,
        "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
      }
    ]
  }
}

S3

Integration with Amazon S3 enables automatic, serverless processing of object-level events in S3 buckets. When triggered by actions like object creation or deletion, Lambda functions receive detailed event information, allowing for real-time file processing, data transformations, and automated workflows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from urllib.parse import unquote_plus

from aws_lambda_powertools.utilities.data_classes import S3Event, event_source


@event_source(data_class=S3Event)
def lambda_handler(event: S3Event, context):
    bucket_name = event.bucket_name

    # Multiple records can be delivered in a single event
    for record in event.records:
        object_key = unquote_plus(record.s3.get_object.key)
        object_etag = record.s3.get_object.etag
    return {
        "bucket": bucket_name,
        "object_key": object_key,
        "object_etag": object_etag,
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-2",
      "eventTime": "2019-09-03T19:37:27.192Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "AWS:AIDAINPONIXQXHT3IKHL2"
      },
      "requestParameters": {
        "sourceIPAddress": "205.255.255.255"
      },
      "responseElements": {
        "x-amz-request-id": "D82B88E5F771F645",
        "x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo="
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1",
        "bucket": {
          "name": "lambda-artifacts-deafc19498e3f2df",
          "ownerIdentity": {
            "principalId": "A3I5XTEXAMAI3E"
          },
          "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"
        },
        "object": {
          "key": "b21b84d653bb07b05b1e6b33684dc11b",
          "size": 1305107,
          "eTag": "b21b84d653bb07b05b1e6b33684dc11b",
          "sequencer": "0C0F6F405D6ED209E1"
        }
      }
    }
  ]
}

S3 Batch Operations

This example is based on the AWS S3 Batch Operations documentation Example Lambda function for S3 Batch Operations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import boto3
from botocore.exceptions import ClientError

from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent, S3BatchOperationResponse, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=S3BatchOperationEvent)
def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext):
    response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure")

    task = event.task
    src_key: str = task.s3_key
    src_bucket: str = task.s3_bucket

    s3 = boto3.client("s3", region_name="us-east-1")

    try:
        dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key)
        result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}")
    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        error_message = e.response["Error"]["Message"]
        if error_code == "RequestTimeout":
            result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.")
        else:
            result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}")
    except Exception as e:
        result = task.build_task_batch_response("PermanentFailure", str(e))
    finally:
        response.add_result(result)

    return response.asdict()


def do_some_work(s3_client, src_bucket: str, src_key: str):
    ...
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
{
  "invocationSchemaVersion": "2.0",
  "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
  "job": {
    "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce",
    "userArguments": {
      "k1": "v1",
      "k2": "v2"
    }
  },
  "tasks": [
    {
      "taskId": "dGFza2lkZ29lc2hlcmUK",
      "s3Key": "prefix/dataset/dataset.20231222.json.gz",
      "s3VersionId": null,
      "s3Bucket": "powertools-dataset"
    }
  ]
}

S3 Object Lambda

This example is based on the AWS Blog post Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import boto3
import requests

from aws_lambda_powertools import Logger
from aws_lambda_powertools.logging.correlation_paths import S3_OBJECT_LAMBDA
from aws_lambda_powertools.utilities.data_classes.s3_object_event import S3ObjectLambdaEvent

logger = Logger()
session = boto3.session.Session()
s3 = session.client("s3")


@logger.inject_lambda_context(correlation_id_path=S3_OBJECT_LAMBDA, log_event=True)
def lambda_handler(event, context):
    event = S3ObjectLambdaEvent(event)

    # Get object from S3
    response = requests.get(event.input_s3_url)
    original_object = response.content.decode("utf-8")

    # Make changes to the object about to be returned
    transformed_object = original_object.upper()

    # Write object back to S3 Object Lambda
    s3.write_get_object_response(
        Body=transformed_object,
        RequestRoute=event.request_route,
        RequestToken=event.request_token,
    )

    return {"status_code": 200}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
    "xAmzRequestId": "1a5ed718-5f53-471d-b6fe-5cf62d88d02a",
    "getObjectContext": {
        "inputS3Url": "https://myap-123412341234.s3-accesspoint.us-east-1.amazonaws.com/s3.txt?X-Amz-Security-Token=...",
        "outputRoute": "io-iad-cell001",
        "outputToken": "..."
    },
    "configuration": {
        "accessPointArn": "arn:aws:s3-object-lambda:us-east-1:123412341234:accesspoint/myolap",
        "supportingAccessPointArn": "arn:aws:s3:us-east-1:123412341234:accesspoint/myap",
        "payload": "test"
    },
    "userRequest": {
        "url": "/s3.txt",
        "headers": {
            "Host": "myolap-123412341234.s3-object-lambda.us-east-1.amazonaws.com",
            "Accept-Encoding": "identity",
            "X-Amz-Content-SHA256": "e3b0c44297fc1c149afbf4c8995fb92427ae41e4649b934ca495991b7852b855"
        }
    },
    "userIdentity": {
        "type": "IAMUser",
        "principalId": "...",
        "arn": "arn:aws:iam::123412341234:user/myuser",
        "accountId": "123412341234",
        "accessKeyId": "..."
    },
    "protocolVersion": "1.00"
}

S3 EventBridge Notification

S3 EventBridge notifications enhance Lambda's ability to process S3 events by routing them through Amazon EventBridge. This integration offers advanced filtering, multiple destination support, and standardized CloudEvents format.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aws_lambda_powertools.utilities.data_classes import S3EventBridgeNotificationEvent, event_source


@event_source(data_class=S3EventBridgeNotificationEvent)
def lambda_handler(event: S3EventBridgeNotificationEvent, context):
    bucket_name = event.detail.bucket.name
    file_key = event.detail.object.key
    if event.detail_type == "Object Created":
        print(f"Object {file_key} created in bucket {bucket_name}")
    return {
        "bucket": bucket_name,
        "file_key": file_key,
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
    "version": "0",
    "id": "f5f1e65c-dc3a-93ca-6c1e-b1647eac7963",
    "detail-type": "Object Created",
    "source": "aws.s3",
    "account": "123456789012",
    "time": "2023-03-08T17:50:14Z",
    "region": "eu-west-1",
    "resources": [
        "arn:aws:s3:::example-bucket"
    ],
    "detail": {
        "version": "0",
        "bucket": {
            "name": "example-bucket"
        },
        "object": {
            "key": "IMG_m7fzo3.jpg",
            "size": 184662,
            "etag": "4e68adba0abe2dc8653dc3354e14c01d",
            "sequencer": "006408CAD69598B05E"
        },
        "request-id": "57H08PA84AB1JZW0",
        "requester": "123456789012",
        "source-ip-address": "34.252.34.74",
        "reason": "PutObject"
    }
}

Secrets Manager

AWS Secrets Manager rotation uses an AWS Lambda function to update the secret. Click here for more information about rotating AWS Secrets Manager secrets.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from aws_lambda_powertools.utilities import parameters
from aws_lambda_powertools.utilities.data_classes import SecretsManagerEvent, event_source

secrets_provider = parameters.SecretsProvider()


@event_source(data_class=SecretsManagerEvent)
def lambda_handler(event: SecretsManagerEvent, context):
    # Getting secret value using Parameter utility
    # See https://docs.powertools.aws.dev/lambda/python/latest/utilities/parameters/
    secret = secrets_provider.get(event.secret_id, VersionId=event.version_id, VersionStage="AWSCURRENT")

    # You need to work with secrets afterwards
    # Check more examples: https://github.com/aws-samples/aws-secrets-manager-rotation-lambdas

    return secret
1
2
3
4
5
{
    "SecretId":"arn:aws:secretsmanager:us-west-2:123456789012:secret:MyTestDatabaseSecret-a1b2c3",
    "ClientRequestToken":"550e8400-e29b-41d4-a716-446655440000",
    "Step":"createSecret"
}

SES

The integration with Simple Email Service (SES) enables serverless email processing. When configured, SES can trigger Lambda functions in response to incoming emails or delivery status notifications. The Lambda function receives an SES event containing details like sender, recipients, and email content.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aws_lambda_powertools.utilities.data_classes import SESEvent, event_source


@event_source(data_class=SESEvent)
def lambda_handler(event: SESEvent, context):
    # Multiple records can be delivered in a single event
    for record in event.records:
        mail = record.ses.mail
        common_headers = mail.common_headers
    return {
        "mail": mail,
        "common_headers": common_headers,
    }
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
{
  "Records": [
    {
      "eventVersion": "1.0",
      "ses": {
        "mail": {
          "commonHeaders": {
            "from": [
              "Jane Doe <janedoe@example.com>"
            ],
            "to": [
              "johndoe@example.com"
            ],
            "returnPath": "janedoe@example.com",
            "messageId": "<0123456789example.com>",
            "date": "Wed, 7 Oct 2015 12:34:56 -0700",
            "subject": "Test Subject"
          },
          "source": "janedoe@example.com",
          "timestamp": "1970-01-01T00:00:00.000Z",
          "destination": [
            "johndoe@example.com"
          ],
          "headers": [
            {
              "name": "Return-Path",
              "value": "<janedoe@example.com>"
            },
            {
              "name": "Received",
              "value": "from mailer.example.com (mailer.example.com [203.0.113.1]) by ..."
            },
            {
              "name": "DKIM-Signature",
              "value": "v=1; a=rsa-sha256; c=relaxed/relaxed; d=example.com; s=example; ..."
            },
            {
              "name": "MIME-Version",
              "value": "1.0"
            },
            {
              "name": "From",
              "value": "Jane Doe <janedoe@example.com>"
            },
            {
              "name": "Date",
              "value": "Wed, 7 Oct 2015 12:34:56 -0700"
            },
            {
              "name": "Message-ID",
              "value": "<0123456789example.com>"
            },
            {
              "name": "Subject",
              "value": "Test Subject"
            },
            {
              "name": "To",
              "value": "johndoe@example.com"
            },
            {
              "name": "Content-Type",
              "value": "text/plain; charset=UTF-8"
            }
          ],
          "headersTruncated": false,
          "messageId": "o3vrnil0e2ic28tr"
        },
        "receipt": {
          "recipients": [
            "johndoe@example.com"
          ],
          "timestamp": "1970-01-01T00:00:00.000Z",
          "spamVerdict": {
            "status": "PASS"
          },
          "dkimVerdict": {
            "status": "PASS"
          },
          "dmarcPolicy": "reject",
          "processingTimeMillis": 574,
          "action": {
            "type": "Lambda",
            "invocationType": "Event",
            "functionArn": "arn:aws:lambda:us-west-2:012345678912:function:Example"
          },
          "dmarcVerdict": {
            "status": "PASS"
          },
          "spfVerdict": {
            "status": "PASS"
          },
          "virusVerdict": {
            "status": "PASS"
          }
        }
      },
      "eventSource": "aws:ses"
    }
  ]
}

SNS

The integration with Simple Notification Service (SNS) enables serverless message processing. When configured, SNS can trigger Lambda functions in response to published messages or notifications. The Lambda function receives an SNS event containing details like the message body, subject, and metadata.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aws_lambda_powertools.utilities.data_classes import SNSEvent, event_source


@event_source(data_class=SNSEvent)
def lambda_handler(event: SNSEvent, context):
    # Multiple records can be delivered in a single event
    for record in event.records:
        message = record.sns.message
        subject = record.sns.subject
    return {
        "message": message,
        "subject": subject,
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
  "Records": [
    {
      "EventVersion": "1.0",
      "EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:sns-la ...",
      "EventSource": "aws:sns",
      "Sns": {
        "SignatureVersion": "1",
        "Timestamp": "2019-01-02T12:45:07.000Z",
        "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==",
        "SigningCertUrl": "https://sns.us-east-2.amazonaws.com/SimpleNotification",
        "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
        "Message": "Hello from SNS!",
        "MessageAttributes": {
          "Test": {
            "Type": "String",
            "Value": "TestString"
          },
          "TestBinary": {
            "Type": "Binary",
            "Value": "TestBinary"
          }
        },
        "Type": "Notification",
        "UnsubscribeUrl": "https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe",
        "TopicArn": "arn:aws:sns:us-east-2:123456789012:sns-lambda",
        "Subject": "TestInvoke"
      }
    }
  ]
}

SQS

The integration with Simple Queue Service (SQS) enables serverless queue processing. When configured, SQS can trigger Lambda functions in response to messages in the queue. The Lambda function receives an SQS event containing details like message body, attributes, and metadata.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aws_lambda_powertools.utilities.data_classes import SQSEvent, event_source


@event_source(data_class=SQSEvent)
def lambda_handler(event: SQSEvent, context):
    # Multiple records can be delivered in a single event
    for record in event.records:
        message = record.body
        message_id = record.message_id
    return {
        "message": message,
        "message_id": message_id,
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
  "Records": [
    {
      "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
      "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
      "body": "Test message.",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1545082649183",
        "SenderId": "AIDAIENQZJOLO23YVJ4VO",
        "ApproximateFirstReceiveTimestamp": "1545082649185"
      },
      "messageAttributes": {
        "testAttr": {
          "stringValue": "100",
          "binaryValue": "base64Str",
          "dataType": "Number"
        }
      },
      "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
      "awsRegion": "us-east-2"
    },
    {
      "messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
      "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
      "body": "{\"message\": \"foo1\"}",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1545082650636",
        "SenderId": "AIDAIENQZJOLO23YVJ4VO",
        "ApproximateFirstReceiveTimestamp": "1545082650649"
      },
      "messageAttributes": {},
      "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
      "awsRegion": "us-east-2"
    }
  ]
}

VPC Lattice V2

You can register your Lambda functions as targets within an Amazon VPC Lattice service network. By doing this, your Lambda function becomes a service within the network, and clients that have access to the VPC Lattice service network can call your service using Payload V2.

Click here for more information about using AWS Lambda with Amazon VPC Lattice.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import VPCLatticeEventV2, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=VPCLatticeEventV2)
def lambda_handler(event: VPCLatticeEventV2, context: LambdaContext):
    logger.info(event.body)

    response = {
        "isBase64Encoded": False,
        "statusCode": 200,
        "statusDescription": "200 OK",
        "headers": {"Content-Type": "application/text"},
        "body": "VPC Lattice V2 Event ✨🎉✨",
    }

    return response
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
  "version": "2.0",
  "path": "/todos",
  "method": "GET",
  "headers": {
    "user_agent": "curl/7.64.1",
    "x-forwarded-for": "10.213.229.10",
    "host": "test-lambda-service-3908sdf9u3u.dkfjd93.vpc-lattice-svcs.us-east-2.on.aws",
    "accept": "*/*"
  },
  "queryStringParameters": {
    "order-id": "1"
  },
  "body": "{\"message\": \"Hello from Lambda!\"}",
  "requestContext": {
      "serviceNetworkArn": "arn:aws:vpc-lattice:us-east-2:123456789012:servicenetwork/sn-0bf3f2882e9cc805a",
      "serviceArn": "arn:aws:vpc-lattice:us-east-2:123456789012:service/svc-0a40eebed65f8d69c",
      "targetGroupArn": "arn:aws:vpc-lattice:us-east-2:123456789012:targetgroup/tg-6d0ecf831eec9f09",
      "identity": {
        "sourceVpcArn": "arn:aws:ec2:region:123456789012:vpc/vpc-0b8276c84697e7339",
        "type" : "AWS_IAM",
        "principal": "arn:aws:sts::123456789012:assumed-role/example-role/057d00f8b51257ba3c853a0f248943cf",
        "sessionName": "057d00f8b51257ba3c853a0f248943cf",
        "x509SanDns": "example.com"
      },
      "region": "us-east-2",
      "timeEpoch": "1696331543569073"
  }
}

VPC Lattice V1

You can register your Lambda functions as targets within an Amazon VPC Lattice service network. By doing this, your Lambda function becomes a service within the network, and clients that have access to the VPC Lattice service network can call your service.

Click here for more information about using AWS Lambda with Amazon VPC Lattice.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import VPCLatticeEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=VPCLatticeEvent)
def lambda_handler(event: VPCLatticeEvent, context: LambdaContext):
    logger.info(event.body)

    response = {
        "isBase64Encoded": False,
        "statusCode": 200,
        "headers": {"Content-Type": "application/text"},
        "body": "Event Response to VPC Lattice 🔥🚀🔥",
    }

    return response
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
    "raw_path": "/testpath",
    "method": "GET",
    "headers": {
      "user_agent": "curl/7.64.1",
      "x-forwarded-for": "10.213.229.10",
      "host": "test-lambda-service-3908sdf9u3u.dkfjd93.vpc-lattice-svcs.us-east-2.on.aws",
      "accept": "*/*"
    },
    "query_string_parameters": {
      "order-id": "1"
    },
    "body": "eyJ0ZXN0IjogImV2ZW50In0=",
    "is_base64_encoded": true
  }

Advanced

Debugging

Alternatively, you can print out the fields to obtain more information. All classes come with a __str__ method that generates a dictionary string which can be quite useful for debugging.

However, certain events may contain sensitive fields such as secret_access_key and session_token, which are labeled as [SENSITIVE] to prevent any accidental disclosure of confidential information.

If we fail to deserialize a field value (e.g., JSON), they will appear as [Cannot be deserialized]

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import (
    CodePipelineJobEvent,
    event_source,
)


@event_source(data_class=CodePipelineJobEvent)
def lambda_handler(event, context):
    print(event)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
    "CodePipeline.job": {
        "id": "11111111-abcd-1111-abcd-111111abcdef",
        "accountId": "111111111111",
        "data": {
            "actionConfiguration": {
                "configuration": {
                    "FunctionName": "MyLambdaFunctionForAWSCodePipeline",
                    "UserParameters": "some-input-such-as-a-URL"
                }
            },
            "inputArtifacts": [
                {
                    "name": "ArtifactName",
                    "revision": null,
                    "location": {
                        "type": "S3",
                        "s3Location": {
                            "bucketName": "the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890",
                            "objectKey": "the name of the application, for example CodePipelineDemoApplication.zip"
                        }
                    }
                }
            ],
            "outputArtifacts": [],
            "artifactCredentials": {
                "accessKeyId": "AKIAIOSFODNN7EXAMPLE",
                "secretAccessKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
                "sessionToken": "MIICiTCCAfICCQD6m7oRw0uXOjANBgkqhkiG9w0BAQUFADCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wHhcNMTEwNDI1MjA0NTIxWhcNMTIwNDI0MjA0NTIxWjCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMaK0dn+a4GmWIWJ21uUSfwfEvySWtC2XADZ4nB+BLYgVIk60CpiwsZ3G93vUEIO3IyNoH/f0wYK8m9TrDHudUZg3qX4waLG5M43q7Wgc/MbQITxOUSQv7c7ugFFDzQGBzZswY6786m86gpEIbb3OhjZnzcvQAaRHhdlQWIMm2nrAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAtCu4nUhVVxYUntneD9+h8Mg9q6q+auNKyExzyLwaxlAoo7TJHidbtS4J5iNmZgXL0FkbFFBjvSfpJIlJ00zbhNYS5f6GuoEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTbNYiytVbZPQUQ5Yaxu2jXnimvw3rrszlaEXAMPLE="
            },
            "continuationToken": "A continuation token if continuing job"
        }
    }
  }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
    "account_id":"111111111111",
    "data":{
       "action_configuration":{
          "configuration":{
             "decoded_user_parameters":"[Cannot be deserialized]",
             "function_name":"MyLambdaFunctionForAWSCodePipeline",
             "raw_event":"[SENSITIVE]",
             "user_parameters":"some-input-such-as-a-URL"
          },
          "raw_event":"[SENSITIVE]"
       },
       "artifact_credentials":{
          "access_key_id":"AKIAIOSFODNN7EXAMPLE",
          "expiration_time":"None",
          "raw_event":"[SENSITIVE]",
          "secret_access_key":"[SENSITIVE]",
          "session_token":"[SENSITIVE]"
       },
       "continuation_token":"A continuation token if continuing job",
       "encryption_key":"None",
       "input_artifacts":[
          {
             "location":{
                "get_type":"S3",
                "raw_event":"[SENSITIVE]",
                "s3_location":{
                   "bucket_name":"the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890",
                   "key":"the name of the application, for example CodePipelineDemoApplication.zip",
                   "object_key":"the name of the application, for example CodePipelineDemoApplication.zip",
                   "raw_event":"[SENSITIVE]"
                }
             },
             "name":"ArtifactName",
             "raw_event":"[SENSITIVE]",
             "revision":"None"
          }
       ],
       "output_artifacts":[

       ],
       "raw_event":"[SENSITIVE]"
    },
    "decoded_user_parameters":"[Cannot be deserialized]",
    "get_id":"11111111-abcd-1111-abcd-111111abcdef",
    "input_bucket_name":"the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890",
    "input_object_key":"the name of the application, for example CodePipelineDemoApplication.zip",
    "raw_event":"[SENSITIVE]",
    "user_parameters":"some-input-such-as-a-URL"
 }