Skip to content

Event Source Data Classes

Event Source Data Classes utility provides classes self-describing Lambda event sources.

Key features

  • Type hinting and code completion for common event types
  • Helper functions for decoding/deserializing nested fields
  • Docstrings for fields contained in event schemas

Background

When authoring Lambda functions, you often need to understand the schema of the event dictionary which is passed to the handler. There are several common event types which follow a specific schema, depending on the service triggering the Lambda function.

Getting started

Utilizing the data classes

The classes are initialized by passing in the Lambda event object into the constructor of the appropriate data class or by using the event_source decorator.

For example, if your Lambda function is being triggered by an API Gateway proxy integration, you can use the APIGatewayProxyEvent class.

1
2
3
4
5
6
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent

def lambda_handler(event: dict, context):
    event = APIGatewayProxyEvent(event)
    if 'helloworld' in event.path and event.http_method == 'GET':
        do_something_with(event.body, user)

Same example as above, but using the event_source decorator

1
2
3
4
5
6
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent

@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
    if 'helloworld' in event.path and event.http_method == 'GET':
        do_something_with(event.body, user)

Log Data Event for Troubleshooting

1
2
3
4
5
6
7
8
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent
from aws_lambda_powertools.logging.logger import Logger

logger = Logger(service="hello_logs", level="DEBUG")

@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
    logger.debug(event)

Autocomplete with self-documented properties and methods

Utilities Data Classes

Supported event sources

Event Source Data_class
Active MQ ActiveMQEvent
API Gateway Authorizer APIGatewayAuthorizerRequestEvent
API Gateway Authorizer V2 APIGatewayAuthorizerEventV2
API Gateway Proxy APIGatewayProxyEvent
API Gateway Proxy V2 APIGatewayProxyEventV2
Application Load Balancer ALBEvent
AppSync Authorizer AppSyncAuthorizerEvent
AppSync Resolver AppSyncResolverEvent
AWS Config Rule AWSConfigRuleEvent
Bedrock Agent BedrockAgent
CloudFormation Custom Resource CloudFormationCustomResourceEvent
CloudWatch Alarm State Change Action CloudWatchAlarmEvent
CloudWatch Dashboard Custom Widget CloudWatchDashboardCustomWidgetEvent
CloudWatch Logs CloudWatchLogsEvent
CodeDeploy Lifecycle Hook CodeDeployLifecycleHookEvent
CodePipeline Job Event CodePipelineJobEvent
Cognito User Pool Multiple available under cognito_user_pool_event
Connect Contact Flow ConnectContactFlowEvent
DynamoDB streams DynamoDBStreamEvent, DynamoDBRecordEventName
EventBridge EventBridgeEvent
Kafka KafkaEvent
Kinesis Data Stream KinesisStreamEvent
Kinesis Firehose Delivery Stream KinesisFirehoseEvent
Lambda Function URL LambdaFunctionUrlEvent
Rabbit MQ RabbitMQEvent
S3 S3Event
S3 Batch Operations S3BatchOperationEvent
S3 Object Lambda S3ObjectLambdaEvent
S3 EventBridge Notification S3EventBridgeNotificationEvent
SES SESEvent
SNS SNSEvent
SQS SQSEvent
VPC Lattice V2 VPCLatticeV2Event
VPC Lattice V1 VPCLatticeEvent
Info

The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of documentation inherently (via autocompletion, types and docstrings).

Active MQ

It is used for Active MQ payloads, also see the AWS blog post for more details.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from typing import Dict

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent

logger = Logger()

@event_source(data_class=ActiveMQEvent)
def lambda_handler(event: ActiveMQEvent, context):
    for message in event.messages:
        logger.debug(f"MessageID: {message.message_id}")
        data: Dict = message.json_data
        logger.debug("Process json in base64 encoded data str", data)

API Gateway Authorizer

New in 1.20.0

It is used for API Gateway Rest API Lambda Authorizer payload.

Use APIGatewayAuthorizerRequestEvent for type REQUEST and APIGatewayAuthorizerTokenEvent for type TOKEN.

This example uses the APIGatewayAuthorizerResponse to decline a given request if the user is not found.

When the user is found, it includes the user details in the request context that will be available to the back-end, and returns a full access policy for admin users.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
    DENY_ALL_RESPONSE,
    APIGatewayAuthorizerRequestEvent,
    APIGatewayAuthorizerResponse,
    HttpVerb,
)
from secrets import compare_digest


def get_user_by_token(token):
    if compare_digest(token, "admin-foo"):
        return {"id": 0, "name": "Admin", "isAdmin": True}
    elif compare_digest(token, "regular-foo"):
        return {"id": 1, "name": "Joe"}
    else:
        return None


@event_source(data_class=APIGatewayAuthorizerRequestEvent)
def handler(event: APIGatewayAuthorizerRequestEvent, context):
    user = get_user_by_token(event.headers["Authorization"])

    if user is None:
        # No user was found
        # to return 401 - `{"message":"Unauthorized"}`, but pollutes lambda error count metrics
        # raise Exception("Unauthorized")
        # to return 403 - `{"message":"Forbidden"}`
        return DENY_ALL_RESPONSE

    # parse the `methodArn` as an `APIGatewayRouteArn`
    arn = event.parsed_arn

    # Create the response builder from parts of the `methodArn`
    # and set the logged in user id and context
    policy = APIGatewayAuthorizerResponse(
        principal_id=user["id"],
        context=user,
        region=arn.region,
        aws_account_id=arn.aws_account_id,
        api_id=arn.api_id,
        stage=arn.stage,
    )

    # Conditional IAM Policy
    if user.get("isAdmin", False):
        policy.allow_all_routes()
    else:
        policy.allow_route(HttpVerb.GET.value, "/user-profile")

    return policy.asdict()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
    APIGatewayAuthorizerTokenEvent,
    APIGatewayAuthorizerResponse,
)


@event_source(data_class=APIGatewayAuthorizerTokenEvent)
def handler(event: APIGatewayAuthorizerTokenEvent, context):
    arn = event.parsed_arn

    policy = APIGatewayAuthorizerResponse(
        principal_id="user",
        region=arn.region,
        aws_account_id=arn.aws_account_id,
        api_id=arn.api_id,
        stage=arn.stage
    )

    if event.authorization_token == "42":
        policy.allow_all_routes()
    else:
        policy.deny_all_routes()
    return policy.asdict()

API Gateway Authorizer V2

New in 1.20.0

It is used for API Gateway HTTP API Lambda Authorizer payload version 2. See also this blog post for more details.

This example looks up user details via x-token header. It uses APIGatewayAuthorizerResponseV2 to return a deny policy when user is not found or authorized.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
    APIGatewayAuthorizerEventV2,
    APIGatewayAuthorizerResponseV2,
)
from secrets import compare_digest


def get_user_by_token(token):
    if compare_digest(token, "Foo"):
        return {"name": "Foo"}
    return None


@event_source(data_class=APIGatewayAuthorizerEventV2)
def handler(event: APIGatewayAuthorizerEventV2, context):
    user = get_user_by_token(event.headers["x-token"])

    if user is None:
        # No user was found, so we return not authorized
        return APIGatewayAuthorizerResponseV2().asdict()

    # Found the user and setting the details in the context
    return APIGatewayAuthorizerResponseV2(authorize=True, context=user).asdict()

API Gateway Proxy

It is used for either API Gateway REST API or HTTP API using v1 proxy event.

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent

@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
    if "helloworld" in event.path and event.http_method == "GET":
        request_context = event.request_context
        identity = request_context.identity
        user = identity.user
        do_something_with(event.json_body, user)

API Gateway Proxy V2

It is used for HTTP API using v2 proxy event.

1
2
3
4
5
6
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEventV2

@event_source(data_class=APIGatewayProxyEventV2)
def lambda_handler(event: APIGatewayProxyEventV2, context):
    if "helloworld" in event.path and event.http_method == "POST":
        do_something_with(event.json_body, event.query_string_parameters)

Application Load Balancer

Is it used for Application load balancer event.

1
2
3
4
5
6
from aws_lambda_powertools.utilities.data_classes import event_source, ALBEvent

@event_source(data_class=ALBEvent)
def lambda_handler(event: ALBEvent, context):
    if "helloworld" in event.path and event.http_method == "POST":
        do_something_with(event.json_body, event.query_string_parameters)

AppSync Authorizer

New in 1.20.0

Used when building an AWS_LAMBDA Authorization with AppSync. See blog post Introducing Lambda authorization for AWS AppSync GraphQL APIs or read the Amplify documentation on using AWS Lambda for authorization with AppSync.

In this example extract the requestId as the correlation_id for logging, used @event_source decorator and builds the AppSync authorizer using the AppSyncAuthorizerResponse helper.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from typing import Dict

from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.logging.logger import Logger
from aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event import (
    AppSyncAuthorizerEvent,
    AppSyncAuthorizerResponse,
)
from aws_lambda_powertools.utilities.data_classes.event_source import event_source

logger = Logger()


def get_user_by_token(token: str):
    """Look a user by token"""
    ...


@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER)
@event_source(data_class=AppSyncAuthorizerEvent)
def lambda_handler(event: AppSyncAuthorizerEvent, context) -> Dict:
    user = get_user_by_token(event.authorization_token)

    if not user:
        # No user found, return not authorized
        return AppSyncAuthorizerResponse().asdict()

    return AppSyncAuthorizerResponse(
        authorize=True,
        resolver_context={"id": user.id},
        # Only allow admins to delete events
        deny_fields=None if user.is_admin else ["Mutation.deleteEvent"],
    ).asdict()

AppSync Resolver

New in 1.12.0

Used when building Lambda GraphQL Resolvers with Amplify GraphQL Transform Library (@function), and AppSync Direct Lambda Resolvers.

In this example, we also use the new Logger correlation_id and built-in correlation_paths to extract, if available, X-Ray Trace ID in AppSync request headers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from aws_lambda_powertools.logging import Logger, correlation_paths
from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import (
    AppSyncResolverEvent,
    AppSyncIdentityCognito
)

logger = Logger()

def get_locations(name: str = None, size: int = 0, page: int = 0):
    """Your resolver logic here"""

@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)
def lambda_handler(event, context):
    event: AppSyncResolverEvent = AppSyncResolverEvent(event)

    # Case insensitive look up of request headers
    x_forwarded_for = event.headers.get("x-forwarded-for")

    # Support for AppSyncIdentityCognito or AppSyncIdentityIAM identity types
    assert isinstance(event.identity, AppSyncIdentityCognito)
    identity: AppSyncIdentityCognito = event.identity

    # Logging with correlation_id
    logger.debug({
        "x-forwarded-for": x_forwarded_for,
        "username": identity.username
    })

    if event.type_name == "Merchant" and event.field_name == "locations":
        return get_locations(**event.arguments)

    raise ValueError(f"Unsupported field resolver: {event.field_name}")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
  "typeName": "Merchant",
  "fieldName": "locations",
  "arguments": {
    "page": 2,
    "size": 1,
    "name": "value"
  },
  "identity": {
    "claims": {
      "iat": 1615366261
      ...
    },
    "username": "mike",
    ...
  },
  "request": {
    "headers": {
      "x-amzn-trace-id": "Root=1-60488877-0b0c4e6727ab2a1c545babd0",
      "x-forwarded-for": "127.0.0.1"
      ...
    }
  },
  ...
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
    "level":"DEBUG",
    "location":"lambda_handler:22",
    "message":{
        "x-forwarded-for":"127.0.0.1",
        "username":"mike"
    },
    "timestamp":"2021-03-10 12:38:40,062",
    "service":"service_undefined",
    "sampling_rate":0.0,
    "cold_start":true,
    "function_name":"func_name",
    "function_memory_size":512,
    "function_arn":"func_arn",
    "function_request_id":"6735a29c-c000-4ae3-94e6-1f1c934f7f94",
    "correlation_id":"Root=1-60488877-0b0c4e6727ab2a1c545babd0"
}

AWS Config Rule

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import (
    AWSConfigRuleEvent,
    event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=AWSConfigRuleEvent)
def lambda_handler(event: AWSConfigRuleEvent, context: LambdaContext):
    message_type = event.invoking_event.message_type

    logger.info(f"Logging {message_type} event rule", invoke_event=event.raw_invoking_event)

    return {"Success": "OK"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
    "version":"1.0",
    "invokingEvent":"{\"configurationItemDiff\":{\"changedProperties\":{\"Configuration.InstanceType\":{\"previousValue\":\"t2.micro\",\"updatedValue\":\"t2.medium\",\"changeType\":\"UPDATE\"},\"Configuration.State.Name\":{\"previousValue\":\"running\",\"updatedValue\":\"stopped\",\"changeType\":\"UPDATE\"},\"Configuration.StateTransitionReason\":{\"previousValue\":\"\",\"updatedValue\":\"User initiated (2023-04-27 15:01:07 GMT)\",\"changeType\":\"UPDATE\"},\"Configuration.StateReason\":{\"previousValue\":null,\"updatedValue\":{\"code\":\"Client.UserInitiatedShutdown\",\"message\":\"Client.UserInitiatedShutdown: User initiated shutdown\"},\"changeType\":\"CREATE\"},\"Configuration.CpuOptions.CoreCount\":{\"previousValue\":1,\"updatedValue\":2,\"changeType\":\"UPDATE\"}},\"changeType\":\"UPDATE\"},\"configurationItem\":{\"relatedEvents\":[],\"relationships\":[{\"resourceId\":\"eipalloc-0ebb4367662263cc1\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::EIP\",\"name\":\"Is attached to ElasticIp\"},{\"resourceId\":\"eni-034dd31c4b17ada8c\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::NetworkInterface\",\"name\":\"Contains NetworkInterface\"},{\"resourceId\":\"eni-09a604c0ec356b06f\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::NetworkInterface\",\"name\":\"Contains NetworkInterface\"},{\"resourceId\":\"sg-0fb295a327d9b4835\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::SecurityGroup\",\"name\":\"Is associated with SecurityGroup\"},{\"resourceId\":\"subnet-cad1f2f4\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::Subnet\",\"name\":\"Is contained in Subnet\"},{\"resourceId\":\"vol-0a288b5eb9fea4b30\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::Volume\",\"name\":\"Is attached to Volume\"},{\"resourceId\":\"vpc-2d96be57\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::VPC\",\"name\":\"Is contained in Vpc\"}],\"configuration\":{\"amiLaunchIndex\":0,\"imageId\":\"ami-09d95fab7fff3776c\",\"instanceId\":\"i-042dd005362091826\",\"instanceType\":\"t2.medium\",\"kernelId\":null,\"keyName\":\"mihaec2\",\"launchTime\":\"2023-04-27T14:57:16.000Z\",\"monitoring\":{\"state\":\"disabled\"},\"placement\":{\"availabilityZone\":\"us-east-1e\",\"affinity\":null,\"groupName\":\"\",\"partitionNumber\":null,\"hostId\":null,\"tenancy\":\"default\",\"spreadDomain\":null,\"hostResourceGroupArn\":null},\"platform\":null,\"privateDnsName\":\"ip-172-31-78-41.ec2.internal\",\"privateIpAddress\":\"172.31.78.41\",\"productCodes\":[],\"publicDnsName\":\"ec2-3-232-229-57.compute-1.amazonaws.com\",\"publicIpAddress\":\"3.232.229.57\",\"ramdiskId\":null,\"state\":{\"code\":80,\"name\":\"stopped\"},\"stateTransitionReason\":\"User initiated (2023-04-27 15:01:07 GMT)\",\"subnetId\":\"subnet-cad1f2f4\",\"vpcId\":\"vpc-2d96be57\",\"architecture\":\"x86_64\",\"blockDeviceMappings\":[{\"deviceName\":\"/dev/xvda\",\"ebs\":{\"attachTime\":\"2020-05-30T15:21:58.000Z\",\"deleteOnTermination\":true,\"status\":\"attached\",\"volumeId\":\"vol-0a288b5eb9fea4b30\"}}],\"clientToken\":\"\",\"ebsOptimized\":false,\"enaSupport\":true,\"hypervisor\":\"xen\",\"iamInstanceProfile\":{\"arn\":\"arn:aws:iam::0123456789012:instance-profile/AmazonSSMRoleForInstancesQuickSetup\",\"id\":\"AIPAS5S4WFUBL72S3QXW5\"},\"instanceLifecycle\":null,\"elasticGpuAssociations\":[],\"elasticInferenceAcceleratorAssociations\":[],\"networkInterfaces\":[{\"association\":{\"carrierIp\":null,\"ipOwnerId\":\"0123456789012\",\"publicDnsName\":\"ec2-3-232-229-57.compute-1.amazonaws.com\",\"publicIp\":\"3.232.229.57\"},\"attachment\":{\"attachTime\":\"2020-05-30T15:21:57.000Z\",\"attachmentId\":\"eni-attach-0a7e75dc9c1c291a0\",\"deleteOnTermination\":true,\"deviceIndex\":0,\"status\":\"attached\",\"networkCardIndex\":0},\"description\":\"\",\"groups\":[{\"groupName\":\"minhaec2\",\"groupId\":\"sg-0fb295a327d9b4835\"}],\"ipv6Addresses\":[],\"macAddress\":\"06:cf:00:c2:17:db\",\"networkInterfaceId\":\"eni-034dd31c4b17ada8c\",\"ownerId\":\"0123456789012\",\"privateDnsName\":\"ip-172-31-78-41.ec2.internal\",\"privateIpAddress\":\"172.31.78.41\",\"privateIpAddresses\":[{\"association\":{\"carrierIp\":null,\"ipOwnerId\":\"0123456789012\",\"publicDnsName\":\"ec2-3-232-229-57.compute-1.amazonaws.com\",\"publicIp\":\"3.232.229.57\"},\"primary\":true,\"privateDnsName\":\"ip-172-31-78-41.ec2.internal\",\"privateIpAddress\":\"172.31.78.41\"}],\"sourceDestCheck\":true,\"status\":\"in-use\",\"subnetId\":\"subnet-cad1f2f4\",\"vpcId\":\"vpc-2d96be57\",\"interfaceType\":\"interface\"},{\"association\":null,\"attachment\":{\"attachTime\":\"2020-11-26T23:46:04.000Z\",\"attachmentId\":\"eni-attach-0e6d150ebbd19966e\",\"deleteOnTermination\":false,\"deviceIndex\":1,\"status\":\"attached\",\"networkCardIndex\":0},\"description\":\"MINHAEC2AAAAAA\",\"groups\":[{\"groupName\":\"minhaec2\",\"groupId\":\"sg-0fb295a327d9b4835\"},{\"groupName\":\"default\",\"groupId\":\"sg-88105fa0\"}],\"ipv6Addresses\":[],\"macAddress\":\"06:0a:62:00:64:5f\",\"networkInterfaceId\":\"eni-09a604c0ec356b06f\",\"ownerId\":\"0123456789012\",\"privateDnsName\":\"ip-172-31-70-9.ec2.internal\",\"privateIpAddress\":\"172.31.70.9\",\"privateIpAddresses\":[{\"association\":null,\"primary\":true,\"privateDnsName\":\"ip-172-31-70-9.ec2.internal\",\"privateIpAddress\":\"172.31.70.9\"}],\"sourceDestCheck\":true,\"status\":\"in-use\",\"subnetId\":\"subnet-cad1f2f4\",\"vpcId\":\"vpc-2d96be57\",\"interfaceType\":\"interface\"}],\"outpostArn\":null,\"rootDeviceName\":\"/dev/xvda\",\"rootDeviceType\":\"ebs\",\"securityGroups\":[{\"groupName\":\"minhaec2\",\"groupId\":\"sg-0fb295a327d9b4835\"}],\"sourceDestCheck\":true,\"spotInstanceRequestId\":null,\"sriovNetSupport\":null,\"stateReason\":{\"code\":\"Client.UserInitiatedShutdown\",\"message\":\"Client.UserInitiatedShutdown: User initiated shutdown\"},\"tags\":[{\"key\":\"projeto\",\"value\":\"meetup\"},{\"key\":\"Name\",\"value\":\"Minha\"},{\"key\":\"CentroCusto\",\"value\":\"TI\"},{\"key\":\"Setor\",\"value\":\"Desenvolvimento\"}],\"virtualizationType\":\"hvm\",\"cpuOptions\":{\"coreCount\":2,\"threadsPerCore\":1},\"capacityReservationId\":null,\"capacityReservationSpecification\":{\"capacityReservationPreference\":\"open\",\"capacityReservationTarget\":null},\"hibernationOptions\":{\"configured\":false},\"licenses\":[],\"metadataOptions\":{\"state\":\"applied\",\"httpTokens\":\"optional\",\"httpPutResponseHopLimit\":1,\"httpEndpoint\":\"enabled\"},\"enclaveOptions\":{\"enabled\":false},\"bootMode\":null},\"supplementaryConfiguration\":{},\"tags\":{\"projeto\":\"meetup\",\"Setor\":\"Desenvolvimento\",\"CentroCusto\":\"TI\",\"Name\":\"Minha\"},\"configurationItemVersion\":\"1.3\",\"configurationItemCaptureTime\":\"2023-04-27T15:03:11.636Z\",\"configurationStateId\":1682607791636,\"awsAccountId\":\"0123456789012\",\"configurationItemStatus\":\"OK\",\"resourceType\":\"AWS::EC2::Instance\",\"resourceId\":\"i-042dd005362091826\",\"resourceName\":null,\"ARN\":\"arn:aws:ec2:us-east-1:0123456789012:instance/i-042dd005362091826\",\"awsRegion\":\"us-east-1\",\"availabilityZone\":\"us-east-1e\",\"configurationStateMd5Hash\":\"\",\"resourceCreationTime\":\"2023-04-27T14:57:16.000Z\"},\"notificationCreationTime\":\"2023-04-27T15:03:13.332Z\",\"messageType\":\"ConfigurationItemChangeNotification\",\"recordVersion\":\"1.3\"}",
    "ruleParameters":"{\"desiredInstanceType\": \"t2.micro\"}",
    "resultToken":"eyJlbmNyeXB0ZWREYXRhIjpbLTQxLDEsLTU3LC0zMCwtMTIxLDUzLDUyLDQ1LC01NywtOCw3MywtODEsLTExNiwtMTAyLC01MiwxMTIsLTQ3LDU4LDY1LC0xMjcsMTAyLDUsLTY5LDQ0LC0xNSwxMTQsNDEsLTksMTExLC0zMCw2NSwtNzUsLTM1LDU0LDEwNSwtODksODYsNDAsLTEwNSw5OCw2NSwtMTE5LC02OSwyNCw2NiwtMjAsODAsLTExMiwtNzgsLTgwLDQzLC01NywzMCwtMjUsODIsLTEwLDMsLTQsLTg1LC01MywtMzcsLTkwLC04OCwtOTgsLTk4LC00MSwxOSwxMTYsNjIsLTIzLC0xMjEsLTEwOCw1NywtNTgsLTUyLDI5LDEwMSwxMjIsLTU2LC03MSwtODEsLTQ3LDc3LC0yMiwtMTI0LC0zLC04NiwtMTIyLC00MCwtODksLTEwMSw1NywtMTI3LC0zNywtMzcsLTMxLC05OCwtMzEsMTEsLTEyNSwwLDEwOCwtMzIsNjQsNjIsLTIyLDAsNDcsLTEwNiwtMTAwLDEwNCwxNCw1OCwxMjIsLTEwLC01MCwtOTAsLTgwLC01MCwtNSw2NSwwLC0yNSw4NSw4Miw3LDkzLDEyMiwtODIsLTExNiwtNzksLTQ0LDcyLC03MywtNjksMTQsLTU2LDk0LDkwLDExNCwtMjksLTExOSwtNzEsODgsMTA3LDEwNywxMTAsLTcsMTI3LC0xMjUsLTU3LC0xMjYsLTEyMCw2OSwtMTI3LC03NiwtMTE5LDcxLDEsLTY4LDEwNywxMTMsLTU2LDg3LC0xMDIsLTE2LDEwOCwtMTA3LC00MywtOTQsLTEwNiwzLDkwLDE0LDcyLC0xMiwtMTE2LC03Myw4MCwtMTIyLDQ0LC0xMDQsMTIsNzQsNTcsLTEwLC0xMDUsLTExMiwtMzYsMjgsLTQ1LDk3LDExLC00OSwtMTEsNjEsMzYsLTE3LC03NCw1MCw0LC0yNiwxMDQsLTI4LC0xMjUsMjQsNzAsLTg1LC00Niw5MiwtMTAzLC00MSwtMTA2LDY5LDEyMiwyMSwtMjUsODAsOTksLTkzLC01NiwtMjUsLTQ3LC0xMjMsLTU5LC0xMjQsLTUyLC0xNiwxMjcsLTM4LC0xNiwxMDEsMTE5LDEwNywyNywxMCwtNDYsLTg3LC0xMiwtMzksMTQsNDUsMiw3MCwxMDcsMTA0LC00LC02OSwtMTIsNTksLTEyNiwtOTEsMTI3LDU0LDEwNiwtMTI2LC0xMTYsLTEwMiw3Miw4MSw1MCw3NSwtNTEsMTA4LDQxLC0zLC02LC00NSwxMDMsLTg2LDM3LC00NiwtMzIsLTExMSwxMjQsMTExLDg3LDU0LC03NiwxMjIsLTUsLTM2LC04OCw5LC0xMTMsMTE2LC01OSw4Myw3NywyOCwxMiwtNjUsLTExMywtNzksLTEyOCw4MiwtMTE4LC04MywtMTI0LDMxLDk5LC05MCwtOTksMTYsLTEyMywyMSwtMTE0LC05OCwtMTE2LC0xMTksMiwtNzMsNDYsODIsLTEzLDU0LDcxLC00MiwyNSw3NCw3MywtODYsOTQsNDYsOTksOTMsLTgyLDU1LDY1LC05OCw0OSwtNjAsMTEyLDEwMSwyMiw2OSwtMTYsNzcsLTk0LC01OSwtNDYsMTE1LDMwLC00Myw5Myw4OCwtMjgsMzgsNiw4NCwzMSwtMTAxLDMyLC0yMiwtNjMsLTk1LDExNCwtNzUsMTE0LDM2LC04NCw0MCwtNDQsLTEzLDU5LDcyLC0xLC0xMDMsMzEsMTA1LDY5LDY5LDc3LC02NCwtNTYsMTE4LDEzLC0xMTQsODAsOTksLTUzLDI1LDQyLDk0LDczLC04MCwyNSwzOCwyNCwtMTcsNjYsLTExOCwtMjMsMTE5LDkwLDEyMSwxMTgsLTUxLDUxLC0xMiwtNzYsLTUxLDksLTIxLDExNCwtMzcsLTY0LC0yLC0xMjYsLTk1LDYzLDczLC00MSwtMzQsLTkwLC0yMiw1OSwtNzksMzAsLTQsLTEsLTUsMTIsMzksLTk5LC0xMDUsLTEwNCwtNjEsNjUsLTc0LDE5LC0xMywtNjAsLTI4LC04LDQsLTgsMTIxLC0xMTgsMTIyLC02NSwtMjEsMjMsMTcsLTg0LDQwLC05MiwxNCwtMTI2LC02MCwtNzksLTUzLDM3LC04Myw2NSwxMDQsLTM2LC02MCwtMTEwLC0zMywtMTE3LDYsMTA3LDEsLTMsOTMsNzgsLTk1LC0xMjIsNTMsMTA4LC00OSwtNDksMjQsLTY1LDgzLDEyNSwtNzcsLTE5LC04MSwzNCwtNjcsLTQzLC03MCwtMjYsMTgsMTA0LDY1LDQsLTEyNiw0NCwtMTE5LDUyLC00NiwyMiw2NywxMTMsMTE4LC0zMywzNCwtOTYsMTIxLDE5LC0yLC0zNSwwLC04MiwxNyw2NiwtMjcsNjksLTM2LC0xNCw1NiwtOTcsLTE2LDEyMywyOCwtOTUsLTMyLC02MywtNjksNzAsNjQsLTMzLC0xMDAsNDMsLTExMywxMDUsMTAwLDEwOCwtNjAsNDAsLTIsLTk2LC0xMjQsMzcsLTQ1LC0xMjQsLTY4LC02OSwtMTIzLDE3LC02LDg2LC01OSwtOTQsMTEwLDczLDU3LC0xMTYsMTA3LC00MSwtOTQsLTExOCwtMTI2LDEwLC04MCwtNzAsMTAyLDg4LC0xMjYsODcsLTI3LC0xMDEsLTk0LC0zNSwtMTA2LC02LC03MiwtODYsNTAsMTE2LC0yOCw5MCwxMywtMTIwLDYsMjcsOTIsNTYsLTkwLDM5LDQ5LC0xMywtODYsLTI1LC04NiwxMTMsLTEzLDQxLC0xMTksOTQsLTk0LC0xMDMsLTgzLC02MCwxMjcsLTE1LC0zOSwxMTksLTk1LDI3LDQ0LDExNiwxMDksNywtMTAyLC0xNyw0OCwtODIsLTMxLC04LC02OSwzNSw5NCw1NCwtNTUsMSwtMTE5LDU3LC0xMDgsLTMsLTkxLC0xMjIsLTUzLC04OCw0LC05NywtMzUsMTI2LDExOSw1OSwtMSw4NSw3MywtNTgsLTEyMCwtNjQsMTE5LC0xMTIsOTIsMTksOSwtNjYsLTkyLDEwOCwtMTEsLTQyLDExMSwtMTA0LC0xMjAsMjcsLTEwMywtNjksMTksMTExLDEyLDIzLDEwNyw1NCw0MSwtMjYsNjAsLTMxLC01XSwibWF0ZXJpYWxTZXRTZXJpYWxOdW1iZXIiOjEsIml2UGFyYW1ldGVyU3BlYyI6eyJpdiI6Wy05NSwzMiwxMDgsOTEsMzUsLTgyLC0zNywyNCwtNDQsLTExNSwtODIsLTEyOCwtMTIyLDMsNTMsLTI0XX19",
    "eventLeftScope":false,
    "executionRoleArn":"arn:aws:iam::0123456789012:role/aws-service-role/config.amazonaws.com/AWSServiceRoleForConfig",
    "configRuleArn":"arn:aws:config:us-east-1:0123456789012:config-rule/config-rule-i9y8j9",
    "configRuleName":"MyRule",
    "configRuleId":"config-rule-i9y8j9",
    "accountId":"0123456789012",
    "evaluationMode":"DETECTIVE"
 }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
{
    "invokingEvent": "{\"configurationItemSummary\": {\"changeType\": \"UPDATE\",\"configurationItemVersion\": \"1.2\",\"configurationItemCaptureTime\":\"2016-10-06T16:46:16.261Z\",\"configurationStateId\": 0,\"awsAccountId\":\"123456789012\",\"configurationItemStatus\": \"OK\",\"resourceType\": \"AWS::EC2::Instance\",\"resourceId\":\"i-00000000\",\"resourceName\":null,\"ARN\":\"arn:aws:ec2:us-west-2:123456789012:instance/i-00000000\",\"awsRegion\": \"us-west-2\",\"availabilityZone\":\"us-west-2a\",\"configurationStateMd5Hash\":\"8f1ee69b287895a0f8bc5753eca68e96\",\"resourceCreationTime\":\"2016-10-06T16:46:10.489Z\"},\"messageType\":\"OversizedConfigurationItemChangeNotification\", \"notificationCreationTime\": \"2016-10-06T16:46:16.261Z\", \"recordVersion\": \"1.0\"}",
    "ruleParameters": "{\"myParameterKey\":\"myParameterValue\"}",
    "resultToken": "myResultToken",
    "eventLeftScope": false,
    "executionRoleArn": "arn:aws:iam::123456789012:role/config-role",
    "configRuleArn": "arn:aws:config:us-east-2:123456789012:config-rule/config-rule-ec2-managed-instance-inventory",
    "configRuleName": "change-triggered-config-rule",
    "configRuleId": "config-rule-0123456",
    "accountId": "123456789012",
    "version": "1.0"
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
    "version":"1.0",
    "invokingEvent":"{\"awsAccountId\":\"0123456789012\",\"notificationCreationTime\":\"2023-04-27T13:26:17.741Z\",\"messageType\":\"ScheduledNxotification\",\"recordVersion\":\"1.0\"}",
    "ruleParameters":"{\"test\":\"x\"}",
    "resultToken":"eyJlbmNyeXB0ZWREYXRhIjpbLTQyLDEyNiw1MiwtMzcsLTI5LDExNCwxMjYsLTk3LDcxLDIyLC0xMTAsMTEyLC0zMSwtOTMsLTQ5LC0xMDEsODIsMyw1NCw0OSwzLC02OSwtNzEsLTcyLDYyLDgxLC03MiwtODEsNTAsMzUsLTUwLC03NSwtMTE4LC0xMTgsNzcsMTIsLTEsMTQsMTIwLC03MCwxMTAsLTMsNTAsLTYwLDEwNSwtNTcsNDUsMTAyLC0xMDksLTYxLC0xMDEsLTYxLDQsNDcsLTg0LC0yNSwxMTIsNTQsLTcxLC0xMDksNDUsMTksMTIzLC0yNiwxMiwtOTYsLTczLDU0LC0xMDksOTIsNDgsLTU5LC04MywtMzIsODIsLTM2LC05MCwxOSw5OCw3Nyw3OCw0MCw4MCw3OCwtMTA1LDg3LC0xMTMsLTExNiwtNzIsMzAsLTY4LC00MCwtODksMTA5LC0xMDgsLTEyOCwyMiw3Miw3NywtMjEsNzYsODksOTQsLTU5LDgxLC0xMjEsLTEwNywtNjcsNjMsLTcsODIsLTg5LC00NiwtMzQsLTkyLDEyMiwtOTAsMTcsLTEyMywyMCwtODUsLTU5LC03MCw4MSwyNyw2Miw3NCwtODAsODAsMzcsNDAsMTE2LDkxLC0yNCw1MSwtNDEsLTc5LDI4LDEyMCw1MywtMTIyLC04MywxMjYsLTc4LDI1LC05OCwtMzYsMTMsMzIsODYsLTI1LDQ4LDMsLTEwMiwtMTYsMjQsLTMsODUsNDQsLTI4LDE0LDIyLDI3LC0xMjIsMTE4LDEwMSw3Myw1LDE4LDU4LC02NCwyMywtODYsLTExNCwyNCwwLDEwMCwyLDExNywtNjIsLTExOSwtMTI4LDE4LDY1LDkwLDE0LC0xMDIsMjEsODUsMTAwLDExNyw1NSwyOSwxMjcsNTQsNzcsNzIsNzQsMzIsNzgsMywtMTExLDExOCwtNzAsLTg2LDEyNywtNzQsNjAsMjIsNDgsMzcsODcsMTMsMCwtMTA1LDUsLTEyMiwtNzEsLTEwMCwxMDQsLTEyNiwtMTYsNzksLTMwLDEyMCw3NywtNzYsLTQxLC0xMDksMiw5NywtMTAxLC0xLDE1LDEyMywxMTksMTA4LDkxLC0yMCwtMTI1LC05NiwyLC05MiwtMTUsLTY3LC03NiwxMjEsMTA0LDEwNSw2NCwtNjIsMTAyLDgsNCwxMjEsLTQ1LC04MCwtODEsLTgsMTE4LDQ0LC04MiwtNDEsLTg0LDczLC0zNiwxMTcsODAsLTY5LC03MywxNCwtMTgsNzIsMzEsLTUsLTExMSwtMTI3LC00MywzNCwtOCw1NywxMDMsLTQyLDE4LC0zMywxMTcsLTI2LC0xMjQsLTEyNCwxNSw4OCwyMywxNiwtNTcsNTQsLTYsLTEwMiwxMTYsLTk5LC00NSwxMDAsLTM1LDg3LDM3LDYsOTgsMiwxMTIsNjAsLTMzLDE3LDI2LDk5LC0xMDUsNDgsLTEwNCwtMTE5LDc4LDYsLTU4LDk1LDksNDEsLTE2LDk2LDQxLC0yMiw5Niw3MiwxMTYsLTk1LC0xMDUsLTM2LC0xMjMsLTU1LDkxLC00NiwtNywtOTIsMzksNDUsODQsMTYsLTEyNCwtMTIyLC02OCwxLC0yOCwxMjIsLTYwLDgyLDEwMywtNTQsLTkyLDI3LC05OSwtMTI4LDY1LDcsLTcyLC0xMjcsNjIsLTIyLDIsLTExLDE4LC04OSwtMTA2LC03NCw3MSw4NiwtMTE2LC0yNSwtMTE1LC05Niw1NywtMzQsMjIsLTEyNCwtMTI1LC00LC00MSw0MiwtNTcsLTEwMyw0NSw3OCwxNCwtMTA2LDExMSw5OCwtOTQsLTcxLDUsNzUsMTksLTEyNCwtMzAsMzQsLTUwLDc1LC04NCwtNTAsLTU2LDUxLC0xNSwtMzYsNjEsLTk0LC03OSwtNDUsMTI2LC03NywtMTA1LC0yLC05MywtNiw4LC0zLDYsLTQyLDQ2LDEyNSw1LC05OCwxMyw2NywtMTAsLTEzLC05NCwtNzgsLTEyNywxMjEsLTI2LC04LC0xMDEsLTkxLDEyMSwtNDAsLTEyNCwtNjQsODQsLTcyLDYzLDE5LC04NF0sIm1hdGVyaWFsU2V0U2VyaWFsTnVtYmVyIjoxLCJpdlBhcmFtZXRlclNwZWMiOnsiaXYiOlszLC0xMCwtODUsMTE0LC05MCwxMTUsNzcsNTUsNTQsMTUsMzgsODQsLTExNiwxNCwtNDAsMjhdfX0=",
    "eventLeftScope":false,
    "executionRoleArn":"arn:aws:iam::0123456789012:role/aws-service-role/config.amazonaws.com/AWSServiceRoleForConfig",
    "configRuleArn":"arn:aws:config:us-east-1:0123456789012:config-rule/config-rule-pdmyw1",
    "configRuleName":"rule-ec2-test",
    "configRuleId":"config-rule-pdmyw1",
    "accountId":"0123456789012",
    "evaluationMode":"DETECTIVE"
 }

Bedrock Agent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import BedrockAgentEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=BedrockAgentEvent)
def lambda_handler(event: BedrockAgentEvent, context: LambdaContext) -> dict:
    input_text = event.input_text

    logger.info(f"Bedrock Agent {event.action_group} invoked with input", input_text=input_text)

    return {
        "message_version": "1.0",
        "responses": [
            {
                "action_group": event.action_group,
                "api_path": event.api_path,
                "http_method": event.http_method,
                "http_status_code": 200,
                "response_body": {"application/json": {"body": "This is the response"}},
            },
        ],
    }

CloudFormation Custom Resource

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import (
    CloudFormationCustomResourceEvent,
    event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=CloudFormationCustomResourceEvent)
def lambda_handler(event: CloudFormationCustomResourceEvent, context: LambdaContext):
    request_type = event.request_type

    if request_type == "Create":
        return on_create(event)
    if request_type == "Update":
        return on_update(event)
    if request_type == "Delete":
        return on_delete(event)


def on_create(event: CloudFormationCustomResourceEvent):
    props = event.resource_properties
    logger.info(f"Create new resource with props {props}.")

    # Add your create code here ...
    physical_id = ...

    return {"PhysicalResourceId": physical_id}


def on_update(event: CloudFormationCustomResourceEvent):
    physical_id = event.physical_resource_id
    props = event.resource_properties
    logger.info(f"Update resource {physical_id} with props {props}.")
    # ...


def on_delete(event: CloudFormationCustomResourceEvent):
    physical_id = event.physical_resource_id
    logger.info(f"Delete resource {physical_id}.")
    # ...

CloudWatch Dashboard Custom Widget

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from aws_lambda_powertools.utilities.data_classes import event_source, CloudWatchDashboardCustomWidgetEvent

const DOCS = `
## Echo
A simple echo script. Anything passed in \`\`\`echo\`\`\` parameter is returned as the content of custom widget.

### Widget parameters
| Param    | Description              |
| -------- | ------------------------ |
| **echo** | The content to echo back |

### Example parameters
\`\`\` yaml
echo: <h1>Hello world</h1>
\`\`\`
`

@event_source(data_class=CloudWatchDashboardCustomWidgetEvent)
def lambda_handler(event: CloudWatchDashboardCustomWidgetEvent, context):

    if event.describe:
        return DOCS

    # You can directly return HTML or JSON content
    # Alternatively, you can return markdown that will be rendered by CloudWatch
    echo = event.widget_context.params["echo"]
    return { "markdown": f"# {echo}" }

CloudWatch Alarm State Change Action

CloudWatch supports Lambda as an alarm state change action. You can use the CloudWathAlarmEvent data class to access the fields containing such data as alarm information, current state, and previous state.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import CloudWatchAlarmEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=CloudWatchAlarmEvent)
def lambda_handler(event: CloudWatchAlarmEvent, context: LambdaContext) -> dict:
    logger.info(f"Alarm {event.alarm_data.alarm_name} state is {event.alarm_data.state.value}")

    # You can now work with event. For example, you can enrich the received data, and
    # decide on how you want to route the alarm.

    return {
        "name": event.alarm_data.alarm_name,
        "arn": event.alarm_arn,
        "urgent": "Priority: P1" in (event.alarm_data.configuration.description or ""),
    }

CloudWatch Logs

CloudWatch Logs events by default are compressed and base64 encoded. You can use the helper function provided to decode, decompress and parse json data from the event.

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import event_source, CloudWatchLogsEvent
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData

@event_source(data_class=CloudWatchLogsEvent)
def lambda_handler(event: CloudWatchLogsEvent, context):
    decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data()
    log_events = decompressed_log.log_events
    for event in log_events:
        do_something_with(event.timestamp, event.message)

Kinesis integration

When streaming CloudWatch Logs to a Kinesis Data Stream (cross-account or not), you can use extract_cloudwatch_logs_from_event to decode, decompress and extract logs as CloudWatchLogsDecodedData to ease log processing.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from typing import List

from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
    KinesisStreamEvent, extract_cloudwatch_logs_from_event)


@event_source(data_class=KinesisStreamEvent)
def simple_handler(event: KinesisStreamEvent, context):
    logs: List[CloudWatchLogsDecodedData] = extract_cloudwatch_logs_from_event(event)
    for log in logs:
        if log.message_type == "DATA_MESSAGE":
            return "success"
    return "nothing to be processed"

Alternatively, you can use extract_cloudwatch_logs_from_record to seamless integrate with the Batch utility for more robust log processing.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType,
                                                   batch_processor)
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
    KinesisStreamRecord, extract_cloudwatch_logs_from_record)

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)


def record_handler(record: KinesisStreamRecord):
    log = extract_cloudwatch_logs_from_record(record)
    return log.message_type == "DATA_MESSAGE"


@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
    return processor.response()

CodeDeploy LifeCycle Hook

CodeDeploy triggers Lambdas with this event when defined in AppSpec definitions to test applications at different stages of deployment.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import (
    event_source,
    CodeDeployLifecycleHookEvent,
)

logger = Logger()

def lambda_handler(
    event: CodeDeployLifecycleHookEvent, context: LambdaContext
) -> None:
    deployment_id = event.deployment_id
    lifecycle_event_hook_execution_id = event.lifecycle_event_hook_execution_id

CodePipeline Job

Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source, CodePipelineJobEvent

logger = Logger()

@event_source(data_class=CodePipelineJobEvent)
def lambda_handler(event, context):
    """The Lambda function handler

    If a continuing job then checks the CloudFormation stack status
    and updates the job accordingly.

    If a new job then kick of an update or creation of the target
    CloudFormation stack.
    """

    # Extract the Job ID
    job_id = event.get_id

    # Extract the params
    params: dict = event.decoded_user_parameters
    stack = params["stack"]
    artifact_name = params["artifact"]
    template_file = params["file"]

    try:
        if event.data.continuation_token:
            # If we're continuing then the create/update has already been triggered
            # we just need to check if it has finished.
            check_stack_update_status(job_id, stack)
        else:
            template = event.get_artifact(artifact_name, template_file)
            # Kick off a stack update or create
            result = start_update_or_create(job_id, stack, template)
            artifact: io.BytesIO = zip_data(result)
            event.put_artifact(
                artifact_name=event.data.output_artifacts[0].name,
                body=artifact,
                content_type="application/zip"
            )
    except Exception as e:
        # If any other exceptions which we didn't expect are raised
        # then fail the job and log the exception message.
        logger.exception("Function failed due to exception.")
        put_job_failure(job_id, "Function exception: " + str(e))

    logger.debug("Function complete.")
    return "Complete."

Cognito User Pool

Cognito User Pools have several different Lambda trigger sources, all of which map to a different data class, which can be imported from aws_lambda_powertools.data_classes.cognito_user_pool_event:

Trigger/Event Source Data Class
Custom message event data_classes.cognito_user_pool_event.CustomMessageTriggerEvent
Post authentication data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent
Post confirmation data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent
Pre authentication data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent
Pre sign-up data_classes.cognito_user_pool_event.PreSignUpTriggerEvent
Pre token generation data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent
Pre token generation V2 data_classes.cognito_user_pool_event.PreTokenGenerationV2TriggerEvent
User migration data_classes.cognito_user_pool_event.UserMigrationTriggerEvent
Define Auth Challenge data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent
Create Auth Challenge data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent
Verify Auth Challenge data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent
Custom Email Sender data_classes.cognito_user_pool_event.CustomEmailSenderTriggerEvent
Custom SMS Sender data_classes.cognito_user_pool_event.CustomSMSSenderTriggerEvent

Post Confirmation Example

1
2
3
4
5
6
7
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import PostConfirmationTriggerEvent

def lambda_handler(event, context):
    event: PostConfirmationTriggerEvent = PostConfirmationTriggerEvent(event)

    user_attributes = event.request.user_attributes
    do_something_with(user_attributes)

Define Auth Challenge Example

Note

In this example we are modifying the wrapped dict response fields, so we need to return the json serializable wrapped event in event.raw_event.

This example is based on the AWS Cognito docs for Define Auth Challenge Lambda Trigger.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import DefineAuthChallengeTriggerEvent

def handler(event: dict, context) -> dict:
    event: DefineAuthChallengeTriggerEvent = DefineAuthChallengeTriggerEvent(event)
    if (
        len(event.request.session) == 1
        and event.request.session[0].challenge_name == "SRP_A"
    ):
        event.response.issue_tokens = False
        event.response.fail_authentication = False
        event.response.challenge_name = "PASSWORD_VERIFIER"
    elif (
        len(event.request.session) == 2
        and event.request.session[1].challenge_name == "PASSWORD_VERIFIER"
        and event.request.session[1].challenge_result
    ):
        event.response.issue_tokens = False
        event.response.fail_authentication = False
        event.response.challenge_name = "CUSTOM_CHALLENGE"
    elif (
        len(event.request.session) == 3
        and event.request.session[2].challenge_name == "CUSTOM_CHALLENGE"
        and event.request.session[2].challenge_result
    ):
        event.response.issue_tokens = True
        event.response.fail_authentication = False
    else:
        event.response.issue_tokens = False
        event.response.fail_authentication = True

    return event.raw_event
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
    "version": "1",
    "region": "us-east-1",
    "userPoolId": "us-east-1_example",
    "userName": "UserName",
    "callerContext": {
        "awsSdkVersion": "awsSdkVersion",
        "clientId": "clientId"
    },
    "triggerSource": "DefineAuthChallenge_Authentication",
    "request": {
        "userAttributes": {
            "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
            "email_verified": "true",
            "name": "First Last",
            "email": "define-auth@mail.com"
        },
        "session": [
            {
                "challengeName": "SRP_A",
                "challengeResult": true
            }
        ]
    },
    "response": {
        "issueTokens": false,
        "failAuthentication": false,
        "challengeName": "PASSWORD_VERIFIER"
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
    "version": "1",
    "region": "us-east-1",
    "userPoolId": "us-east-1_example",
    "userName": "UserName",
    "callerContext": {
        "awsSdkVersion": "awsSdkVersion",
        "clientId": "clientId"
    },
    "triggerSource": "DefineAuthChallenge_Authentication",
    "request": {
        "userAttributes": {
            "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
            "email_verified": "true",
            "name": "First Last",
            "email": "define-auth@mail.com"
        },
        "session": [
            {
                "challengeName": "SRP_A",
                "challengeResult": true
            },
            {
                "challengeName": "PASSWORD_VERIFIER",
                "challengeResult": true
            }
        ]
    },
    "response": {
        "issueTokens": false,
        "failAuthentication": false,
        "challengeName": "CUSTOM_CHALLENGE"
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
    "version": "1",
    "region": "us-east-1",
    "userPoolId": "us-east-1_example",
    "userName": "UserName",
    "callerContext": {
        "awsSdkVersion": "awsSdkVersion",
        "clientId": "clientId"
    },
    "triggerSource": "DefineAuthChallenge_Authentication",
    "request": {
        "userAttributes": {
            "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
            "email_verified": "true",
            "name": "First Last",
            "email": "define-auth@mail.com"
        },
        "session": [
            {
                "challengeName": "SRP_A",
                "challengeResult": true
            },
            {
                "challengeName": "PASSWORD_VERIFIER",
                "challengeResult": true
            },
            {
                "challengeName": "CUSTOM_CHALLENGE",
                "challengeResult": true
            }
        ]
    },
    "response": {
        "issueTokens": true,
        "failAuthentication": false
    }
}

Create Auth Challenge Example

This example is based on the AWS Cognito docs for Create Auth Challenge Lambda Trigger.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import CreateAuthChallengeTriggerEvent

@event_source(data_class=CreateAuthChallengeTriggerEvent)
def handler(event: CreateAuthChallengeTriggerEvent, context) -> dict:
    if event.request.challenge_name == "CUSTOM_CHALLENGE":
        event.response.public_challenge_parameters = {"captchaUrl": "url/123.jpg"}
        event.response.private_challenge_parameters = {"answer": "5"}
        event.response.challenge_metadata = "CAPTCHA_CHALLENGE"
    return event.raw_event

Verify Auth Challenge Response Example

This example is based on the AWS Cognito docs for Verify Auth Challenge Response Lambda Trigger.

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import VerifyAuthChallengeResponseTriggerEvent

@event_source(data_class=VerifyAuthChallengeResponseTriggerEvent)
def handler(event: VerifyAuthChallengeResponseTriggerEvent, context) -> dict:
    event.response.answer_correct = (
        event.request.private_challenge_parameters.get("answer") == event.request.challenge_answer
    )
    return event.raw_event

Connect Contact Flow

New in 1.11.0

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event import (
    ConnectContactFlowChannel,
    ConnectContactFlowEndpointType,
    ConnectContactFlowEvent,
    ConnectContactFlowInitiationMethod,
)

def lambda_handler(event, context):
    event: ConnectContactFlowEvent = ConnectContactFlowEvent(event)
    assert event.contact_data.attributes == {"Language": "en-US"}
    assert event.contact_data.channel == ConnectContactFlowChannel.VOICE
    assert event.contact_data.customer_endpoint.endpoint_type == ConnectContactFlowEndpointType.TELEPHONE_NUMBER
    assert event.contact_data.initiation_method == ConnectContactFlowInitiationMethod.API

DynamoDB Streams

The DynamoDB data class utility provides the base class for DynamoDBStreamEvent, as well as enums for stream view type (StreamViewType) and event type. (DynamoDBRecordEventName). The class automatically deserializes DynamoDB types into their equivalent Python types.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
    DynamoDBStreamEvent,
    DynamoDBRecordEventName
)

def lambda_handler(event, context):
    event: DynamoDBStreamEvent = DynamoDBStreamEvent(event)

    # Multiple records can be delivered in a single event
    for record in event.records:
        if record.event_name == DynamoDBRecordEventName.MODIFY:
            do_something_with(record.dynamodb.new_image)
            do_something_with(record.dynamodb.old_image)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=DynamoDBStreamEvent)
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
    for record in event.records:
        # {"N": "123.45"} => Decimal("123.45")
        key: str = record.dynamodb.keys["id"]
        print(key)

EventBridge

1
2
3
4
5
from aws_lambda_powertools.utilities.data_classes import event_source, EventBridgeEvent

@event_source(data_class=EventBridgeEvent)
def lambda_handler(event: EventBridgeEvent, context):
    do_something_with(event.detail)

Kafka

This example is based on the AWS docs for Amazon MSK and self-managed Apache Kafka.

1
2
3
4
5
6
from aws_lambda_powertools.utilities.data_classes import event_source, KafkaEvent

@event_source(data_class=KafkaEvent)
def lambda_handler(event: KafkaEvent, context):
    for record in event.records:
        do_something_with(record.decoded_key, record.json_value)

Kinesis streams

Kinesis events by default contain base64 encoded data. You can use the helper function to access the data either as json or plain text, depending on the original payload.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aws_lambda_powertools.utilities.data_classes import event_source, KinesisStreamEvent

@event_source(data_class=KinesisStreamEvent)
def lambda_handler(event: KinesisStreamEvent, context):
    kinesis_record = next(event.records).kinesis

    # if data was delivered as text
    data = kinesis_record.data_as_text()

    # if data was delivered as json
    data = kinesis_record.data_as_json()

    do_something_with(data)

Kinesis Firehose delivery stream

When using Kinesis Firehose, you can use a Lambda function to perform data transformation. For each transformed record, you can choose to either:

  • A) Put them back to the delivery stream (default)
  • B) Drop them so consumers don't receive them (e.g., data validation)
  • C) Indicate a record failed data transformation and should be retried

To do that, you can use KinesisFirehoseDataTransformationResponse class along with helper functions to make it easier to decode and encode base64 data in the stream.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from aws_lambda_powertools.utilities.data_classes import (
    KinesisFirehoseDataTransformationResponse,
    KinesisFirehoseEvent,
    event_source,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
    result = KinesisFirehoseDataTransformationResponse()

    for record in event.records:
        # get original data using data_as_text property
        data = record.data_as_text  # (1)!

        ## generate data to return
        transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data}

        processed_record = record.build_data_transformation_response(
            data=base64_from_json(transformed_data),  # (2)!
        )

        result.add_record(processed_record)

    # return transformed records
    return result.asdict()
  1. Ingesting JSON payloads?

    Use record.data_as_json to easily deserialize them.
  2. For your convenience, base64_from_json serializes a dict to JSON, then encode as base64 data.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from json import JSONDecodeError
from typing import Dict

from aws_lambda_powertools.utilities.data_classes import (
    KinesisFirehoseDataTransformationRecord,
    KinesisFirehoseDataTransformationResponse,
    KinesisFirehoseEvent,
    event_source,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
    result = KinesisFirehoseDataTransformationResponse()

    for record in event.records:
        try:
            payload: Dict = record.data_as_json  # decodes and deserialize base64 JSON string

            ## generate data to return
            transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}

            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=base64_from_json(transformed_data),
            )
        except JSONDecodeError:  # (1)!
            # our producers ingest JSON payloads only; drop malformed records from the stream
            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=record.data,
                result="Dropped",
            )

        result.add_record(processed_record)

    # return transformed records
    return result.asdict()
  1. This exception would be generated from record.data_as_json if invalid payload.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from aws_lambda_powertools.utilities.data_classes import (
    KinesisFirehoseDataTransformationRecord,
    KinesisFirehoseDataTransformationResponse,
    KinesisFirehoseEvent,
    event_source,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: dict, context: LambdaContext):
    firehose_event = KinesisFirehoseEvent(event)
    result = KinesisFirehoseDataTransformationResponse()

    for record in firehose_event.records:
        try:
            payload = record.data_as_text  # base64 decoded data as str

            # generate data to return
            transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}

            # Default result is Ok
            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=base64_from_json(transformed_data),
            )
        except Exception:
            # add Failed result to processing results, send back to kinesis for retry
            processed_record = KinesisFirehoseDataTransformationRecord(
                record_id=record.record_id,
                data=record.data,
                result="ProcessingFailed",  # (1)!
            )

        result.add_record(processed_record)

    # return transformed records
    return result.asdict()
  1. This record will now be sent to your S3 bucket in the processing-failed folder.

Lambda Function URL

1
2
3
4
5
from aws_lambda_powertools.utilities.data_classes import event_source, LambdaFunctionUrlEvent

@event_source(data_class=LambdaFunctionUrlEvent)
def lambda_handler(event: LambdaFunctionUrlEvent, context):
    do_something_with(event.body)

Rabbit MQ

It is used for Rabbit MQ payloads, also see the blog post for more details.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from typing import Dict

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent

logger = Logger()

@event_source(data_class=RabbitMQEvent)
def lambda_handler(event: RabbitMQEvent, context):
    for queue_name, messages in event.rmq_messages_by_queue.items():
        logger.debug(f"Messages for queue: {queue_name}")
        for message in messages:
            logger.debug(f"MessageID: {message.basic_properties.message_id}")
            data: Dict = message.json_data
            logger.debug("Process json in base64 encoded data str", data)

S3

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from urllib.parse import unquote_plus
from aws_lambda_powertools.utilities.data_classes import event_source, S3Event

@event_source(data_class=S3Event)
def lambda_handler(event: S3Event, context):
    bucket_name = event.bucket_name

    # Multiple records can be delivered in a single event
    for record in event.records:
        object_key = unquote_plus(record.s3.get_object.key)

        do_something_with(f"{bucket_name}/{object_key}")

S3 Batch Operations

This example is based on the AWS S3 Batch Operations documentation Example Lambda function for S3 Batch Operations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import boto3
from botocore.exceptions import ClientError

from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent, S3BatchOperationResponse, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=S3BatchOperationEvent)
def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext):
    response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure")

    task = event.task
    src_key: str = task.s3_key
    src_bucket: str = task.s3_bucket

    s3 = boto3.client("s3", region_name="us-east-1")

    try:
        dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key)
        result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}")
    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        error_message = e.response["Error"]["Message"]
        if error_code == "RequestTimeout":
            result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.")
        else:
            result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}")
    except Exception as e:
        result = task.build_task_batch_response("PermanentFailure", str(e))
    finally:
        response.add_result(result)

    return response.asdict()


def do_some_work(s3_client, src_bucket: str, src_key: str):
    ...

S3 Object Lambda

This example is based on the AWS Blog post Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import boto3
import requests

from aws_lambda_powertools import Logger
from aws_lambda_powertools.logging.correlation_paths import S3_OBJECT_LAMBDA
from aws_lambda_powertools.utilities.data_classes.s3_object_event import S3ObjectLambdaEvent

logger = Logger()
session = boto3.session.Session()
s3 = session.client("s3")

@logger.inject_lambda_context(correlation_id_path=S3_OBJECT_LAMBDA, log_event=True)
def lambda_handler(event, context):
    event = S3ObjectLambdaEvent(event)

    # Get object from S3
    response = requests.get(event.input_s3_url)
    original_object = response.content.decode("utf-8")

    # Make changes to the object about to be returned
    transformed_object = original_object.upper()

    # Write object back to S3 Object Lambda
    s3.write_get_object_response(
        Body=transformed_object, RequestRoute=event.request_route, RequestToken=event.request_token
    )

    return {"status_code": 200}

S3 EventBridge Notification

1
2
3
4
5
6
from aws_lambda_powertools.utilities.data_classes import event_source, S3EventBridgeNotificationEvent

@event_source(data_class=S3EventBridgeNotificationEvent)
def lambda_handler(event: S3EventBridgeNotificationEvent, context):
    bucket_name = event.detail.bucket.name
    file_key = event.detail.object.key

Secrets Manager

AWS Secrets Manager rotation uses an AWS Lambda function to update the secret. Click here for more information about rotating AWS Secrets Manager secrets.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from aws_lambda_powertools.utilities import parameters
from aws_lambda_powertools.utilities.data_classes import SecretsManagerEvent, event_source

secrets_provider = parameters.SecretsProvider()


@event_source(data_class=SecretsManagerEvent)
def lambda_handler(event: SecretsManagerEvent, context):
    # Getting secret value using Parameter utility
    # See https://docs.powertools.aws.dev/lambda/python/latest/utilities/parameters/
    secret = secrets_provider.get(event.secret_id, VersionId=event.version_id, VersionStage="AWSCURRENT")

    # You need to work with secrets afterwards
    # Check more examples: https://github.com/aws-samples/aws-secrets-manager-rotation-lambdas

    return secret
1
2
3
4
5
{
    "SecretId":"arn:aws:secretsmanager:us-west-2:123456789012:secret:MyTestDatabaseSecret-a1b2c3",
    "ClientRequestToken":"550e8400-e29b-41d4-a716-446655440000",
    "Step":"createSecret"
}

SES

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from aws_lambda_powertools.utilities.data_classes import event_source, SESEvent

@event_source(data_class=SESEvent)
def lambda_handler(event: SESEvent, context):
    # Multiple records can be delivered in a single event
    for record in event.records:
        mail = record.ses.mail
        common_headers = mail.common_headers

        do_something_with(common_headers.to, common_headers.subject)

SNS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from aws_lambda_powertools.utilities.data_classes import event_source, SNSEvent

@event_source(data_class=SNSEvent)
def lambda_handler(event: SNSEvent, context):
    # Multiple records can be delivered in a single event
    for record in event.records:
        message = record.sns.message
        subject = record.sns.subject

        do_something_with(subject, message)

SQS

1
2
3
4
5
6
7
from aws_lambda_powertools.utilities.data_classes import event_source, SQSEvent

@event_source(data_class=SQSEvent)
def lambda_handler(event: SQSEvent, context):
    # Multiple records can be delivered in a single event
    for record in event.records:
        do_something_with(record.body)

VPC Lattice V2

You can register your Lambda functions as targets within an Amazon VPC Lattice service network. By doing this, your Lambda function becomes a service within the network, and clients that have access to the VPC Lattice service network can call your service using Payload V2.

Click here for more information about using AWS Lambda with Amazon VPC Lattice.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import VPCLatticeEventV2, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=VPCLatticeEventV2)
def lambda_handler(event: VPCLatticeEventV2, context: LambdaContext):
    logger.info(event.body)

    response = {
        "isBase64Encoded": False,
        "statusCode": 200,
        "statusDescription": "200 OK",
        "headers": {"Content-Type": "application/text"},
        "body": "VPC Lattice V2 Event ✨🎉✨",
    }

    return response
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
  "version": "2.0",
  "path": "/todos",
  "method": "GET",
  "headers": {
    "user_agent": "curl/7.64.1",
    "x-forwarded-for": "10.213.229.10",
    "host": "test-lambda-service-3908sdf9u3u.dkfjd93.vpc-lattice-svcs.us-east-2.on.aws",
    "accept": "*/*"
  },
  "queryStringParameters": {
    "order-id": "1"
  },
  "body": "{\"message\": \"Hello from Lambda!\"}",
  "requestContext": {
      "serviceNetworkArn": "arn:aws:vpc-lattice:us-east-2:123456789012:servicenetwork/sn-0bf3f2882e9cc805a",
      "serviceArn": "arn:aws:vpc-lattice:us-east-2:123456789012:service/svc-0a40eebed65f8d69c",
      "targetGroupArn": "arn:aws:vpc-lattice:us-east-2:123456789012:targetgroup/tg-6d0ecf831eec9f09",
      "identity": {
        "sourceVpcArn": "arn:aws:ec2:region:123456789012:vpc/vpc-0b8276c84697e7339",
        "type" : "AWS_IAM",
        "principal": "arn:aws:sts::123456789012:assumed-role/example-role/057d00f8b51257ba3c853a0f248943cf",
        "sessionName": "057d00f8b51257ba3c853a0f248943cf",
        "x509SanDns": "example.com"
      },
      "region": "us-east-2",
      "timeEpoch": "1696331543569073"
  }
}

VPC Lattice V1

You can register your Lambda functions as targets within an Amazon VPC Lattice service network. By doing this, your Lambda function becomes a service within the network, and clients that have access to the VPC Lattice service network can call your service.

Click here for more information about using AWS Lambda with Amazon VPC Lattice.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import VPCLatticeEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@event_source(data_class=VPCLatticeEvent)
def lambda_handler(event: VPCLatticeEvent, context: LambdaContext):
    logger.info(event.body)

    response = {
        "isBase64Encoded": False,
        "statusCode": 200,
        "headers": {"Content-Type": "application/text"},
        "body": "Event Response to VPC Lattice 🔥🚀🔥",
    }

    return response
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
    "raw_path": "/testpath",
    "method": "GET",
    "headers": {
      "user_agent": "curl/7.64.1",
      "x-forwarded-for": "10.213.229.10",
      "host": "test-lambda-service-3908sdf9u3u.dkfjd93.vpc-lattice-svcs.us-east-2.on.aws",
      "accept": "*/*"
    },
    "query_string_parameters": {
      "order-id": "1"
    },
    "body": "eyJ0ZXN0IjogImV2ZW50In0=",
    "is_base64_encoded": true
  }

Advanced

Debugging

Alternatively, you can print out the fields to obtain more information. All classes come with a __str__ method that generates a dictionary string which can be quite useful for debugging.

However, certain events may contain sensitive fields such as secret_access_key and session_token, which are labeled as [SENSITIVE] to prevent any accidental disclosure of confidential information.

If we fail to deserialize a field value (e.g., JSON), they will appear as [Cannot be deserialized]

1
2
3
4
5
6
7
8
9
from aws_lambda_powertools.utilities.data_classes import (
    CodePipelineJobEvent,
    event_source,
)


@event_source(data_class=CodePipelineJobEvent)
def lambda_handler(event, context):
    print(event)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
    "CodePipeline.job": {
        "id": "11111111-abcd-1111-abcd-111111abcdef",
        "accountId": "111111111111",
        "data": {
            "actionConfiguration": {
                "configuration": {
                    "FunctionName": "MyLambdaFunctionForAWSCodePipeline",
                    "UserParameters": "some-input-such-as-a-URL"
                }
            },
            "inputArtifacts": [
                {
                    "name": "ArtifactName",
                    "revision": null,
                    "location": {
                        "type": "S3",
                        "s3Location": {
                            "bucketName": "the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890",
                            "objectKey": "the name of the application, for example CodePipelineDemoApplication.zip"
                        }
                    }
                }
            ],
            "outputArtifacts": [],
            "artifactCredentials": {
                "accessKeyId": "AKIAIOSFODNN7EXAMPLE",
                "secretAccessKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
                "sessionToken": "MIICiTCCAfICCQD6m7oRw0uXOjANBgkqhkiG9w0BAQUFADCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wHhcNMTEwNDI1MjA0NTIxWhcNMTIwNDI0MjA0NTIxWjCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMaK0dn+a4GmWIWJ21uUSfwfEvySWtC2XADZ4nB+BLYgVIk60CpiwsZ3G93vUEIO3IyNoH/f0wYK8m9TrDHudUZg3qX4waLG5M43q7Wgc/MbQITxOUSQv7c7ugFFDzQGBzZswY6786m86gpEIbb3OhjZnzcvQAaRHhdlQWIMm2nrAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAtCu4nUhVVxYUntneD9+h8Mg9q6q+auNKyExzyLwaxlAoo7TJHidbtS4J5iNmZgXL0FkbFFBjvSfpJIlJ00zbhNYS5f6GuoEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTbNYiytVbZPQUQ5Yaxu2jXnimvw3rrszlaEXAMPLE="
            },
            "continuationToken": "A continuation token if continuing job"
        }
    }
  }

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
    "account_id":"111111111111",
    "data":{
       "action_configuration":{
          "configuration":{
             "decoded_user_parameters":"[Cannot be deserialized]",
             "function_name":"MyLambdaFunctionForAWSCodePipeline",
             "raw_event":"[SENSITIVE]",
             "user_parameters":"some-input-such-as-a-URL"
          },
          "raw_event":"[SENSITIVE]"
       },
       "artifact_credentials":{
          "access_key_id":"AKIAIOSFODNN7EXAMPLE",
          "expiration_time":"None",
          "raw_event":"[SENSITIVE]",
          "secret_access_key":"[SENSITIVE]",
          "session_token":"[SENSITIVE]"
       },
       "continuation_token":"A continuation token if continuing job",
       "encryption_key":"None",
       "input_artifacts":[
          {
             "location":{
                "get_type":"S3",
                "raw_event":"[SENSITIVE]",
                "s3_location":{
                   "bucket_name":"the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890",
                   "key":"the name of the application, for example CodePipelineDemoApplication.zip",
                   "object_key":"the name of the application, for example CodePipelineDemoApplication.zip",
                   "raw_event":"[SENSITIVE]"
                }
             },
             "name":"ArtifactName",
             "raw_event":"[SENSITIVE]",
             "revision":"None"
          }
       ],
       "output_artifacts":[

       ],
       "raw_event":"[SENSITIVE]"
    },
    "decoded_user_parameters":"[Cannot be deserialized]",
    "get_id":"11111111-abcd-1111-abcd-111111abcdef",
    "input_bucket_name":"the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890",
    "input_object_key":"the name of the application, for example CodePipelineDemoApplication.zip",
    "raw_event":"[SENSITIVE]",
    "user_parameters":"some-input-such-as-a-URL"
 }
```