Type hinting and code completion for common event types
Helper functions for decoding/deserializing nested fields
Docstrings for fields contained in event schemas
Background
When authoring Lambda functions, you often need to understand the schema of the event dictionary which is passed to the
handler. There are several common event types which follow a specific schema, depending on the service triggering the
Lambda function.
The classes are initialized by passing in the Lambda event object into the constructor of the appropriate data class or
by using the event_source decorator.
For example, if your Lambda function is being triggered by an API Gateway proxy integration, you can use the
APIGatewayProxyEvent class.
The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of
documentation inherently (via autocompletion, types and docstrings).
fromtypingimportDictfromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.data_classesimportevent_sourcefromaws_lambda_powertools.utilities.data_classes.active_mq_eventimportActiveMQEventlogger=Logger()@event_source(data_class=ActiveMQEvent)deflambda_handler(event:ActiveMQEvent,context):formessageinevent.messages:logger.debug(f"MessageID: {message.message_id}")data:Dict=message.json_datalogger.debug("Process json in base64 encoded data str",data)
Use APIGatewayAuthorizerRequestEvent for type REQUEST and APIGatewayAuthorizerTokenEvent for type TOKEN.
This example uses the APIGatewayAuthorizerResponse to decline a given request if the user is not found.
When the user is found, it includes the user details in the request context that will be available to the back-end, and returns a full access policy for admin users.
fromaws_lambda_powertools.utilities.data_classesimportevent_sourcefromaws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_eventimport(DENY_ALL_RESPONSE,APIGatewayAuthorizerRequestEvent,APIGatewayAuthorizerResponse,HttpVerb,)fromsecretsimportcompare_digestdefget_user_by_token(token):ifcompare_digest(token,"admin-foo"):return{"id":0,"name":"Admin","isAdmin":True}elifcompare_digest(token,"regular-foo"):return{"id":1,"name":"Joe"}else:returnNone@event_source(data_class=APIGatewayAuthorizerRequestEvent)defhandler(event:APIGatewayAuthorizerRequestEvent,context):user=get_user_by_token(event.headers["Authorization"])ifuserisNone:# No user was found# to return 401 - `{"message":"Unauthorized"}`, but pollutes lambda error count metrics# raise Exception("Unauthorized")# to return 403 - `{"message":"Forbidden"}`returnDENY_ALL_RESPONSE# parse the `methodArn` as an `APIGatewayRouteArn`arn=event.parsed_arn# Create the response builder from parts of the `methodArn`# and set the logged in user id and contextpolicy=APIGatewayAuthorizerResponse(principal_id=user["id"],context=user,region=arn.region,aws_account_id=arn.aws_account_id,api_id=arn.api_id,stage=arn.stage,)# Conditional IAM Policyifuser.get("isAdmin",False):policy.allow_all_routes()else:policy.allow_route(HttpVerb.GET.value,"/user-profile")returnpolicy.asdict()
This example looks up user details via x-token header. It uses APIGatewayAuthorizerResponseV2 to return a deny policy when user is not found or authorized.
1 2 3 4 5 6 7 8 9101112131415161718192021222324
fromaws_lambda_powertools.utilities.data_classesimportevent_sourcefromaws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_eventimport(APIGatewayAuthorizerEventV2,APIGatewayAuthorizerResponseV2,)fromsecretsimportcompare_digestdefget_user_by_token(token):ifcompare_digest(token,"Foo"):return{"name":"Foo"}returnNone@event_source(data_class=APIGatewayAuthorizerEventV2)defhandler(event:APIGatewayAuthorizerEventV2,context):user=get_user_by_token(event.headers["x-token"])ifuserisNone:# No user was found, so we return not authorizedreturnAPIGatewayAuthorizerResponseV2().asdict()# Found the user and setting the details in the contextreturnAPIGatewayAuthorizerResponseV2(authorize=True,context=user).asdict()
In this example extract the requestId as the correlation_id for logging, used @event_source decorator and builds the AppSync authorizer using the AppSyncAuthorizerResponse helper.
fromtypingimportDictfromaws_lambda_powertools.loggingimportcorrelation_pathsfromaws_lambda_powertools.logging.loggerimportLoggerfromaws_lambda_powertools.utilities.data_classes.appsync_authorizer_eventimport(AppSyncAuthorizerEvent,AppSyncAuthorizerResponse,)fromaws_lambda_powertools.utilities.data_classes.event_sourceimportevent_sourcelogger=Logger()defget_user_by_token(token:str):"""Look a user by token"""...@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER)@event_source(data_class=AppSyncAuthorizerEvent)deflambda_handler(event:AppSyncAuthorizerEvent,context)->Dict:user=get_user_by_token(event.authorization_token)ifnotuser:# No user found, return not authorizedreturnAppSyncAuthorizerResponse().asdict()returnAppSyncAuthorizerResponse(authorize=True,resolver_context={"id":user.id},# Only allow admins to delete eventsdeny_fields=Noneifuser.is_adminelse["Mutation.deleteEvent"],).asdict()
In this example, we also use the new Logger correlation_id and built-in correlation_paths to extract, if available, X-Ray Trace ID in AppSync request headers:
fromaws_lambda_powertools.loggingimportLogger,correlation_pathsfromaws_lambda_powertools.utilities.data_classes.appsync_resolver_eventimport(AppSyncResolverEvent,AppSyncIdentityCognito)logger=Logger()defget_locations(name:str=None,size:int=0,page:int=0):"""Your resolver logic here"""@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)deflambda_handler(event,context):event:AppSyncResolverEvent=AppSyncResolverEvent(event)# Case insensitive look up of request headersx_forwarded_for=event.headers.get("x-forwarded-for")# Support for AppSyncIdentityCognito or AppSyncIdentityIAM identity typesassertisinstance(event.identity,AppSyncIdentityCognito)identity:AppSyncIdentityCognito=event.identity# Logging with correlation_idlogger.debug({"x-forwarded-for":x_forwarded_for,"username":identity.username})ifevent.type_name=="Merchant"andevent.field_name=="locations":returnget_locations(**event.arguments)raiseValueError(f"Unsupported field resolver: {event.field_name}")
{"version":"1.0","invokingEvent":"{\"configurationItemDiff\":{\"changedProperties\":{\"Configuration.InstanceType\":{\"previousValue\":\"t2.micro\",\"updatedValue\":\"t2.medium\",\"changeType\":\"UPDATE\"},\"Configuration.State.Name\":{\"previousValue\":\"running\",\"updatedValue\":\"stopped\",\"changeType\":\"UPDATE\"},\"Configuration.StateTransitionReason\":{\"previousValue\":\"\",\"updatedValue\":\"User initiated (2023-04-27 15:01:07 GMT)\",\"changeType\":\"UPDATE\"},\"Configuration.StateReason\":{\"previousValue\":null,\"updatedValue\":{\"code\":\"Client.UserInitiatedShutdown\",\"message\":\"Client.UserInitiatedShutdown: User initiated shutdown\"},\"changeType\":\"CREATE\"},\"Configuration.CpuOptions.CoreCount\":{\"previousValue\":1,\"updatedValue\":2,\"changeType\":\"UPDATE\"}},\"changeType\":\"UPDATE\"},\"configurationItem\":{\"relatedEvents\":[],\"relationships\":[{\"resourceId\":\"eipalloc-0ebb4367662263cc1\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::EIP\",\"name\":\"Is attached to ElasticIp\"},{\"resourceId\":\"eni-034dd31c4b17ada8c\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::NetworkInterface\",\"name\":\"Contains NetworkInterface\"},{\"resourceId\":\"eni-09a604c0ec356b06f\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::NetworkInterface\",\"name\":\"Contains NetworkInterface\"},{\"resourceId\":\"sg-0fb295a327d9b4835\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::SecurityGroup\",\"name\":\"Is associated with SecurityGroup\"},{\"resourceId\":\"subnet-cad1f2f4\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::Subnet\",\"name\":\"Is contained in Subnet\"},{\"resourceId\":\"vol-0a288b5eb9fea4b30\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::Volume\",\"name\":\"Is attached to Volume\"},{\"resourceId\":\"vpc-2d96be57\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::VPC\",\"name\":\"Is contained in Vpc\"}],\"configuration\":{\"amiLaunchIndex\":0,\"imageId\":\"ami-09d95fab7fff3776c\",\"instanceId\":\"i-042dd005362091826\",\"instanceType\":\"t2.medium\",\"kernelId\":null,\"keyName\":\"mihaec2\",\"launchTime\":\"2023-04-27T14:57:16.000Z\",\"monitoring\":{\"state\":\"disabled\"},\"placement\":{\"availabilityZone\":\"us-east-1e\",\"affinity\":null,\"groupName\":\"\",\"partitionNumber\":null,\"hostId\":null,\"tenancy\":\"default\",\"spreadDomain\":null,\"hostResourceGroupArn\":null},\"platform\":null,\"privateDnsName\":\"ip-172-31-78-41.ec2.internal\",\"privateIpAddress\":\"172.31.78.41\",\"productCodes\":[],\"publicDnsName\":\"ec2-3-232-229-57.compute-1.amazonaws.com\",\"publicIpAddress\":\"3.232.229.57\",\"ramdiskId\":null,\"state\":{\"code\":80,\"name\":\"stopped\"},\"stateTransitionReason\":\"User initiated (2023-04-27 15:01:07 GMT)\",\"subnetId\":\"subnet-cad1f2f4\",\"vpcId\":\"vpc-2d96be57\",\"architecture\":\"x86_64\",\"blockDeviceMappings\":[{\"deviceName\":\"/dev/xvda\",\"ebs\":{\"attachTime\":\"2020-05-30T15:21:58.000Z\",\"deleteOnTermination\":true,\"status\":\"attached\",\"volumeId\":\"vol-0a288b5eb9fea4b30\"}}],\"clientToken\":\"\",\"ebsOptimized\":false,\"enaSupport\":true,\"hypervisor\":\"xen\",\"iamInstanceProfile\":{\"arn\":\"arn:aws:iam::0123456789012:instance-profile/AmazonSSMRoleForInstancesQuickSetup\",\"id\":\"AIPAS5S4WFUBL72S3QXW5\"},\"instanceLifecycle\":null,\"elasticGpuAssociations\":[],\"elasticInferenceAcceleratorAssociations\":[],\"networkInterfaces\":[{\"association\":{\"carrierIp\":null,\"ipOwnerId\":\"0123456789012\",\"publicDnsName\":\"ec2-3-232-229-57.compute-1.amazonaws.com\",\"publicIp\":\"3.232.229.57\"},\"attachment\":{\"attachTime\":\"2020-05-30T15:21:57.000Z\",\"attachmentId\":\"eni-attach-0a7e75dc9c1c291a0\",\"deleteOnTermination\":true,\"deviceIndex\":0,\"status\":\"attached\",\"networkCardIndex\":0},\"description\":\"\",\"groups\":[{\"groupName\":\"minhaec2\",\"groupId\":\"sg-0fb295a327d9b4835\"}],\"ipv6Addresses\":[],\"macAddress\":\"06:cf:00:c2:17:db\",\"networkInterfaceId\":\"eni-034dd31c4b17ada8c\",\"ownerId\":\"0123456789012\",\"privateDnsName\":\"ip-172-31-78-41.ec2.internal\",\"privateIpAddress\":\"172.31.78.41\",\"privateIpAddresses\":[{\"association\":{\"carrierIp\":null,\"ipOwnerId\":\"0123456789012\",\"publicDnsName\":\"ec2-3-232-229-57.compute-1.amazonaws.com\",\"publicIp\":\"3.232.229.57\"},\"primary\":true,\"privateDnsName\":\"ip-172-31-78-41.ec2.internal\",\"privateIpAddress\":\"172.31.78.41\"}],\"sourceDestCheck\":true,\"status\":\"in-use\",\"subnetId\":\"subnet-cad1f2f4\",\"vpcId\":\"vpc-2d96be57\",\"interfaceType\":\"interface\"},{\"association\":null,\"attachment\":{\"attachTime\":\"2020-11-26T23:46:04.000Z\",\"attachmentId\":\"eni-attach-0e6d150ebbd19966e\",\"deleteOnTermination\":false,\"deviceIndex\":1,\"status\":\"attached\",\"networkCardIndex\":0},\"description\":\"MINHAEC2AAAAAA\",\"groups\":[{\"groupName\":\"minhaec2\",\"groupId\":\"sg-0fb295a327d9b4835\"},{\"groupName\":\"default\",\"groupId\":\"sg-88105fa0\"}],\"ipv6Addresses\":[],\"macAddress\":\"06:0a:62:00:64:5f\",\"networkInterfaceId\":\"eni-09a604c0ec356b06f\",\"ownerId\":\"0123456789012\",\"privateDnsName\":\"ip-172-31-70-9.ec2.internal\",\"privateIpAddress\":\"172.31.70.9\",\"privateIpAddresses\":[{\"association\":null,\"primary\":true,\"privateDnsName\":\"ip-172-31-70-9.ec2.internal\",\"privateIpAddress\":\"172.31.70.9\"}],\"sourceDestCheck\":true,\"status\":\"in-use\",\"subnetId\":\"subnet-cad1f2f4\",\"vpcId\":\"vpc-2d96be57\",\"interfaceType\":\"interface\"}],\"outpostArn\":null,\"rootDeviceName\":\"/dev/xvda\",\"rootDeviceType\":\"ebs\",\"securityGroups\":[{\"groupName\":\"minhaec2\",\"groupId\":\"sg-0fb295a327d9b4835\"}],\"sourceDestCheck\":true,\"spotInstanceRequestId\":null,\"sriovNetSupport\":null,\"stateReason\":{\"code\":\"Client.UserInitiatedShutdown\",\"message\":\"Client.UserInitiatedShutdown: User initiated shutdown\"},\"tags\":[{\"key\":\"projeto\",\"value\":\"meetup\"},{\"key\":\"Name\",\"value\":\"Minha\"},{\"key\":\"CentroCusto\",\"value\":\"TI\"},{\"key\":\"Setor\",\"value\":\"Desenvolvimento\"}],\"virtualizationType\":\"hvm\",\"cpuOptions\":{\"coreCount\":2,\"threadsPerCore\":1},\"capacityReservationId\":null,\"capacityReservationSpecification\":{\"capacityReservationPreference\":\"open\",\"capacityReservationTarget\":null},\"hibernationOptions\":{\"configured\":false},\"licenses\":[],\"metadataOptions\":{\"state\":\"applied\",\"httpTokens\":\"optional\",\"httpPutResponseHopLimit\":1,\"httpEndpoint\":\"enabled\"},\"enclaveOptions\":{\"enabled\":false},\"bootMode\":null},\"supplementaryConfiguration\":{},\"tags\":{\"projeto\":\"meetup\",\"Setor\":\"Desenvolvimento\",\"CentroCusto\":\"TI\",\"Name\":\"Minha\"},\"configurationItemVersion\":\"1.3\",\"configurationItemCaptureTime\":\"2023-04-27T15:03:11.636Z\",\"configurationStateId\":1682607791636,\"awsAccountId\":\"0123456789012\",\"configurationItemStatus\":\"OK\",\"resourceType\":\"AWS::EC2::Instance\",\"resourceId\":\"i-042dd005362091826\",\"resourceName\":null,\"ARN\":\"arn:aws:ec2:us-east-1:0123456789012:instance/i-042dd005362091826\",\"awsRegion\":\"us-east-1\",\"availabilityZone\":\"us-east-1e\",\"configurationStateMd5Hash\":\"\",\"resourceCreationTime\":\"2023-04-27T14:57:16.000Z\"},\"notificationCreationTime\":\"2023-04-27T15:03:13.332Z\",\"messageType\":\"ConfigurationItemChangeNotification\",\"recordVersion\":\"1.3\"}","ruleParameters":"{\"desiredInstanceType\": \"t2.micro\"}","resultToken":"eyJlbmNyeXB0ZWREYXRhIjpbLTQxLDEsLTU3LC0zMCwtMTIxLDUzLDUyLDQ1LC01NywtOCw3MywtODEsLTExNiwtMTAyLC01MiwxMTIsLTQ3LDU4LDY1LC0xMjcsMTAyLDUsLTY5LDQ0LC0xNSwxMTQsNDEsLTksMTExLC0zMCw2NSwtNzUsLTM1LDU0LDEwNSwtODksODYsNDAsLTEwNSw5OCw2NSwtMTE5LC02OSwyNCw2NiwtMjAsODAsLTExMiwtNzgsLTgwLDQzLC01NywzMCwtMjUsODIsLTEwLDMsLTQsLTg1LC01MywtMzcsLTkwLC04OCwtOTgsLTk4LC00MSwxOSwxMTYsNjIsLTIzLC0xMjEsLTEwOCw1NywtNTgsLTUyLDI5LDEwMSwxMjIsLTU2LC03MSwtODEsLTQ3LDc3LC0yMiwtMTI0LC0zLC04NiwtMTIyLC00MCwtODksLTEwMSw1NywtMTI3LC0zNywtMzcsLTMxLC05OCwtMzEsMTEsLTEyNSwwLDEwOCwtMzIsNjQsNjIsLTIyLDAsNDcsLTEwNiwtMTAwLDEwNCwxNCw1OCwxMjIsLTEwLC01MCwtOTAsLTgwLC01MCwtNSw2NSwwLC0yNSw4NSw4Miw3LDkzLDEyMiwtODIsLTExNiwtNzksLTQ0LDcyLC03MywtNjksMTQsLTU2LDk0LDkwLDExNCwtMjksLTExOSwtNzEsODgsMTA3LDEwNywxMTAsLTcsMTI3LC0xMjUsLTU3LC0xMjYsLTEyMCw2OSwtMTI3LC03NiwtMTE5LDcxLDEsLTY4LDEwNywxMTMsLTU2LDg3LC0xMDIsLTE2LDEwOCwtMTA3LC00MywtOTQsLTEwNiwzLDkwLDE0LDcyLC0xMiwtMTE2LC03Myw4MCwtMTIyLDQ0LC0xMDQsMTIsNzQsNTcsLTEwLC0xMDUsLTExMiwtMzYsMjgsLTQ1LDk3LDExLC00OSwtMTEsNjEsMzYsLTE3LC03NCw1MCw0LC0yNiwxMDQsLTI4LC0xMjUsMjQsNzAsLTg1LC00Niw5MiwtMTAzLC00MSwtMTA2LDY5LDEyMiwyMSwtMjUsODAsOTksLTkzLC01NiwtMjUsLTQ3LC0xMjMsLTU5LC0xMjQsLTUyLC0xNiwxMjcsLTM4LC0xNiwxMDEsMTE5LDEwNywyNywxMCwtNDYsLTg3LC0xMiwtMzksMTQsNDUsMiw3MCwxMDcsMTA0LC00LC02OSwtMTIsNTksLTEyNiwtOTEsMTI3LDU0LDEwNiwtMTI2LC0xMTYsLTEwMiw3Miw4MSw1MCw3NSwtNTEsMTA4LDQxLC0zLC02LC00NSwxMDMsLTg2LDM3LC00NiwtMzIsLTExMSwxMjQsMTExLDg3LDU0LC03NiwxMjIsLTUsLTM2LC04OCw5LC0xMTMsMTE2LC01OSw4Myw3NywyOCwxMiwtNjUsLTExMywtNzksLTEyOCw4MiwtMTE4LC04MywtMTI0LDMxLDk5LC05MCwtOTksMTYsLTEyMywyMSwtMTE0LC05OCwtMTE2LC0xMTksMiwtNzMsNDYsODIsLTEzLDU0LDcxLC00MiwyNSw3NCw3MywtODYsOTQsNDYsOTksOTMsLTgyLDU1LDY1LC05OCw0OSwtNjAsMTEyLDEwMSwyMiw2OSwtMTYsNzcsLTk0LC01OSwtNDYsMTE1LDMwLC00Myw5Myw4OCwtMjgsMzgsNiw4NCwzMSwtMTAxLDMyLC0yMiwtNjMsLTk1LDExNCwtNzUsMTE0LDM2LC04NCw0MCwtNDQsLTEzLDU5LDcyLC0xLC0xMDMsMzEsMTA1LDY5LDY5LDc3LC02NCwtNTYsMTE4LDEzLC0xMTQsODAsOTksLTUzLDI1LDQyLDk0LDczLC04MCwyNSwzOCwyNCwtMTcsNjYsLTExOCwtMjMsMTE5LDkwLDEyMSwxMTgsLTUxLDUxLC0xMiwtNzYsLTUxLDksLTIxLDExNCwtMzcsLTY0LC0yLC0xMjYsLTk1LDYzLDczLC00MSwtMzQsLTkwLC0yMiw1OSwtNzksMzAsLTQsLTEsLTUsMTIsMzksLTk5LC0xMDUsLTEwNCwtNjEsNjUsLTc0LDE5LC0xMywtNjAsLTI4LC04LDQsLTgsMTIxLC0xMTgsMTIyLC02NSwtMjEsMjMsMTcsLTg0LDQwLC05MiwxNCwtMTI2LC02MCwtNzksLTUzLDM3LC04Myw2NSwxMDQsLTM2LC02MCwtMTEwLC0zMywtMTE3LDYsMTA3LDEsLTMsOTMsNzgsLTk1LC0xMjIsNTMsMTA4LC00OSwtNDksMjQsLTY1LDgzLDEyNSwtNzcsLTE5LC04MSwzNCwtNjcsLTQzLC03MCwtMjYsMTgsMTA0LDY1LDQsLTEyNiw0NCwtMTE5LDUyLC00NiwyMiw2NywxMTMsMTE4LC0zMywzNCwtOTYsMTIxLDE5LC0yLC0zNSwwLC04MiwxNyw2NiwtMjcsNjksLTM2LC0xNCw1NiwtOTcsLTE2LDEyMywyOCwtOTUsLTMyLC02MywtNjksNzAsNjQsLTMzLC0xMDAsNDMsLTExMywxMDUsMTAwLDEwOCwtNjAsNDAsLTIsLTk2LC0xMjQsMzcsLTQ1LC0xMjQsLTY4LC02OSwtMTIzLDE3LC02LDg2LC01OSwtOTQsMTEwLDczLDU3LC0xMTYsMTA3LC00MSwtOTQsLTExOCwtMTI2LDEwLC04MCwtNzAsMTAyLDg4LC0xMjYsODcsLTI3LC0xMDEsLTk0LC0zNSwtMTA2LC02LC03MiwtODYsNTAsMTE2LC0yOCw5MCwxMywtMTIwLDYsMjcsOTIsNTYsLTkwLDM5LDQ5LC0xMywtODYsLTI1LC04NiwxMTMsLTEzLDQxLC0xMTksOTQsLTk0LC0xMDMsLTgzLC02MCwxMjcsLTE1LC0zOSwxMTksLTk1LDI3LDQ0LDExNiwxMDksNywtMTAyLC0xNyw0OCwtODIsLTMxLC04LC02OSwzNSw5NCw1NCwtNTUsMSwtMTE5LDU3LC0xMDgsLTMsLTkxLC0xMjIsLTUzLC04OCw0LC05NywtMzUsMTI2LDExOSw1OSwtMSw4NSw3MywtNTgsLTEyMCwtNjQsMTE5LC0xMTIsOTIsMTksOSwtNjYsLTkyLDEwOCwtMTEsLTQyLDExMSwtMTA0LC0xMjAsMjcsLTEwMywtNjksMTksMTExLDEyLDIzLDEwNyw1NCw0MSwtMjYsNjAsLTMxLC01XSwibWF0ZXJpYWxTZXRTZXJpYWxOdW1iZXIiOjEsIml2UGFyYW1ldGVyU3BlYyI6eyJpdiI6Wy05NSwzMiwxMDgsOTEsMzUsLTgyLC0zNywyNCwtNDQsLTExNSwtODIsLTEyOCwtMTIyLDMsNTMsLTI0XX19","eventLeftScope":false,"executionRoleArn":"arn:aws:iam::0123456789012:role/aws-service-role/config.amazonaws.com/AWSServiceRoleForConfig","configRuleArn":"arn:aws:config:us-east-1:0123456789012:config-rule/config-rule-i9y8j9","configRuleName":"MyRule","configRuleId":"config-rule-i9y8j9","accountId":"0123456789012","evaluationMode":"DETECTIVE"}
fromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.data_classesimportBedrockAgentEvent,event_sourcefromaws_lambda_powertools.utilities.typingimportLambdaContextlogger=Logger()@event_source(data_class=BedrockAgentEvent)deflambda_handler(event:BedrockAgentEvent,context:LambdaContext)->dict:input_text=event.input_textlogger.info(f"Bedrock Agent {event.action_group} invoked with input",input_text=input_text)return{"message_version":"1.0","responses":[{"action_group":event.action_group,"api_path":event.api_path,"http_method":event.http_method,"http_status_code":200,"response_body":{"application/json":{"body":"This is the response"}},},],}
fromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.data_classesimport(CloudFormationCustomResourceEvent,event_source,)fromaws_lambda_powertools.utilities.typingimportLambdaContextlogger=Logger()@event_source(data_class=CloudFormationCustomResourceEvent)deflambda_handler(event:CloudFormationCustomResourceEvent,context:LambdaContext):request_type=event.request_typeifrequest_type=="Create":returnon_create(event)ifrequest_type=="Update":returnon_update(event)ifrequest_type=="Delete":returnon_delete(event)defon_create(event:CloudFormationCustomResourceEvent):props=event.resource_propertieslogger.info(f"Create new resource with props {props}.")# Add your create code here ...physical_id=...return{"PhysicalResourceId":physical_id}defon_update(event:CloudFormationCustomResourceEvent):physical_id=event.physical_resource_idprops=event.resource_propertieslogger.info(f"Update resource {physical_id} with props {props}.")# ...defon_delete(event:CloudFormationCustomResourceEvent):physical_id=event.physical_resource_idlogger.info(f"Delete resource {physical_id}.")# ...
fromaws_lambda_powertools.utilities.data_classesimportevent_source,CloudWatchDashboardCustomWidgetEventconstDOCS=`## EchoAsimpleechoscript.Anythingpassedin \`\`\`echo\`\`\`parameterisreturnedasthecontentofcustomwidget.### Widget parameters|Param|Description||--------|------------------------||**echo**|Thecontenttoechoback|### Example parameters
\`\`\`yamlecho:<h1>Helloworld</h1>
\`\`\``@event_source(data_class=CloudWatchDashboardCustomWidgetEvent)deflambda_handler(event:CloudWatchDashboardCustomWidgetEvent,context):ifevent.describe:returnDOCS# You can directly return HTML or JSON content# Alternatively, you can return markdown that will be rendered by CloudWatchecho=event.widget_context.params["echo"]return{"markdown":f"# {echo}"}
fromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.data_classesimportCloudWatchAlarmEvent,event_sourcefromaws_lambda_powertools.utilities.typingimportLambdaContextlogger=Logger()@event_source(data_class=CloudWatchAlarmEvent)deflambda_handler(event:CloudWatchAlarmEvent,context:LambdaContext)->dict:logger.info(f"Alarm {event.alarm_data.alarm_name} state is {event.alarm_data.state.value}")# You can now work with event. For example, you can enrich the received data, and# decide on how you want to route the alarm.return{"name":event.alarm_data.alarm_name,"arn":event.alarm_arn,"urgent":"Priority: P1"in(event.alarm_data.configuration.descriptionor""),}
CloudWatch Logs events by default are compressed and base64 encoded. You can use the helper function provided to decode,
decompress and parse json data from the event.
fromtypingimportListfromaws_lambda_powertools.utilities.data_classesimportevent_sourcefromaws_lambda_powertools.utilities.data_classes.cloud_watch_logs_eventimportCloudWatchLogsDecodedDatafromaws_lambda_powertools.utilities.data_classes.kinesis_stream_eventimport(KinesisStreamEvent,extract_cloudwatch_logs_from_event)@event_source(data_class=KinesisStreamEvent)defsimple_handler(event:KinesisStreamEvent,context):logs:List[CloudWatchLogsDecodedData]=extract_cloudwatch_logs_from_event(event)forloginlogs:iflog.message_type=="DATA_MESSAGE":return"success"return"nothing to be processed"
Alternatively, you can use extract_cloudwatch_logs_from_record to seamless integrate with the Batch utility for more robust log processing.
fromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.data_classesimportevent_source,CodePipelineJobEventlogger=Logger()@event_source(data_class=CodePipelineJobEvent)deflambda_handler(event,context):"""The Lambda function handler If a continuing job then checks the CloudFormation stack status and updates the job accordingly. If a new job then kick of an update or creation of the target CloudFormation stack. """# Extract the Job IDjob_id=event.get_id# Extract the paramsparams:dict=event.decoded_user_parametersstack=params["stack"]artifact_name=params["artifact"]template_file=params["file"]try:ifevent.data.continuation_token:# If we're continuing then the create/update has already been triggered# we just need to check if it has finished.check_stack_update_status(job_id,stack)else:template=event.get_artifact(artifact_name,template_file)# Kick off a stack update or createstart_update_or_create(job_id,stack,template)exceptExceptionase:# If any other exceptions which we didn't expect are raised# then fail the job and log the exception message.logger.exception("Function failed due to exception.")put_job_failure(job_id,"Function exception: "+str(e))logger.debug("Function complete.")return"Complete."
Cognito User Pools have several different Lambda trigger sources, all of which map to a different data class, which
can be imported from aws_lambda_powertools.data_classes.cognito_user_pool_event:
The DynamoDB data class utility provides the base class for DynamoDBStreamEvent, as well as enums for stream view type (StreamViewType) and event type.
(DynamoDBRecordEventName).
The class automatically deserializes DynamoDB types into their equivalent Python types.
1 2 3 4 5 6 7 8 910111213
fromaws_lambda_powertools.utilities.data_classes.dynamo_db_stream_eventimport(DynamoDBStreamEvent,DynamoDBRecordEventName)deflambda_handler(event,context):event:DynamoDBStreamEvent=DynamoDBStreamEvent(event)# Multiple records can be delivered in a single eventforrecordinevent.records:ifrecord.event_name==DynamoDBRecordEventName.MODIFY:do_something_with(record.dynamodb.new_image)do_something_with(record.dynamodb.old_image)
Kinesis events by default contain base64 encoded data. You can use the helper function to access the data either as json
or plain text, depending on the original payload.
1 2 3 4 5 6 7 8 910111213
fromaws_lambda_powertools.utilities.data_classesimportevent_source,KinesisStreamEvent@event_source(data_class=KinesisStreamEvent)deflambda_handler(event:KinesisStreamEvent,context):kinesis_record=next(event.records).kinesis# if data was delivered as textdata=kinesis_record.data_as_text()# if data was delivered as jsondata=kinesis_record.data_as_json()do_something_with(data)
When using Kinesis Firehose, you can use a Lambda function to perform data transformation. For each transformed record, you can choose to either:
A) Put them back to the delivery stream (default)
B) Drop them so consumers don't receive them (e.g., data validation)
C) Indicate a record failed data transformation and should be retried
To do that, you can use KinesisFirehoseDataTransformationResponse class along with helper functions to make it easier to decode and encode base64 data in the stream.
fromaws_lambda_powertools.utilities.data_classesimport(KinesisFirehoseDataTransformationResponse,KinesisFirehoseEvent,event_source,)fromaws_lambda_powertools.utilities.serializationimportbase64_from_jsonfromaws_lambda_powertools.utilities.typingimportLambdaContext@event_source(data_class=KinesisFirehoseEvent)deflambda_handler(event:KinesisFirehoseEvent,context:LambdaContext):result=KinesisFirehoseDataTransformationResponse()forrecordinevent.records:# get original data using data_as_text propertydata=record.data_as_text# (1)!## generate data to returntransformed_data={"new_data":"transformed data using Powertools","original_payload":data}processed_record=record.build_data_transformation_response(data=base64_from_json(transformed_data),# (2)!)result.add_record(processed_record)# return transformed recordsreturnresult.asdict()
Ingesting JSON payloads?
Use record.data_as_json to easily deserialize them.
For your convenience, base64_from_json serializes a dict to JSON, then encode as base64 data.
fromjsonimportJSONDecodeErrorfromtypingimportDictfromaws_lambda_powertools.utilities.data_classesimport(KinesisFirehoseDataTransformationRecord,KinesisFirehoseDataTransformationResponse,KinesisFirehoseEvent,event_source,)fromaws_lambda_powertools.utilities.serializationimportbase64_from_jsonfromaws_lambda_powertools.utilities.typingimportLambdaContext@event_source(data_class=KinesisFirehoseEvent)deflambda_handler(event:KinesisFirehoseEvent,context:LambdaContext):result=KinesisFirehoseDataTransformationResponse()forrecordinevent.records:try:payload:Dict=record.data_as_json# decodes and deserialize base64 JSON string## generate data to returntransformed_data={"tool_used":"powertools_dataclass","original_payload":payload}processed_record=KinesisFirehoseDataTransformationRecord(record_id=record.record_id,data=base64_from_json(transformed_data),)exceptJSONDecodeError:# (1)!# our producers ingest JSON payloads only; drop malformed records from the streamprocessed_record=KinesisFirehoseDataTransformationRecord(record_id=record.record_id,data=record.data,result="Dropped",)result.add_record(processed_record)# return transformed recordsreturnresult.asdict()
This exception would be generated from record.data_as_json if invalid payload.
fromaws_lambda_powertools.utilities.data_classesimport(KinesisFirehoseDataTransformationRecord,KinesisFirehoseDataTransformationResponse,KinesisFirehoseEvent,event_source,)fromaws_lambda_powertools.utilities.serializationimportbase64_from_jsonfromaws_lambda_powertools.utilities.typingimportLambdaContext@event_source(data_class=KinesisFirehoseEvent)deflambda_handler(event:dict,context:LambdaContext):firehose_event=KinesisFirehoseEvent(event)result=KinesisFirehoseDataTransformationResponse()forrecordinfirehose_event.records:try:payload=record.data_as_text# base64 decoded data as str# generate data to returntransformed_data={"tool_used":"powertools_dataclass","original_payload":payload}# Default result is Okprocessed_record=KinesisFirehoseDataTransformationRecord(record_id=record.record_id,data=base64_from_json(transformed_data),)exceptException:# add Failed result to processing results, send back to kinesis for retryprocessed_record=KinesisFirehoseDataTransformationRecord(record_id=record.record_id,data=record.data,result="ProcessingFailed",# (1)!)result.add_record(processed_record)# return transformed recordsreturnresult.asdict()
fromtypingimportDictfromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.data_classesimportevent_sourcefromaws_lambda_powertools.utilities.data_classes.rabbit_mq_eventimportRabbitMQEventlogger=Logger()@event_source(data_class=RabbitMQEvent)deflambda_handler(event:RabbitMQEvent,context):forqueue_name,messagesinevent.rmq_messages_by_queue.items():logger.debug(f"Messages for queue: {queue_name}")formessageinmessages:logger.debug(f"MessageID: {message.basic_properties.message_id}")data:Dict=message.json_datalogger.debug("Process json in base64 encoded data str",data)
fromurllib.parseimportunquote_plusfromaws_lambda_powertools.utilities.data_classesimportevent_source,S3Event@event_source(data_class=S3Event)deflambda_handler(event:S3Event,context):bucket_name=event.bucket_name# Multiple records can be delivered in a single eventforrecordinevent.records:object_key=unquote_plus(record.s3.get_object.key)do_something_with(f"{bucket_name}/{object_key}")
importboto3frombotocore.exceptionsimportClientErrorfromaws_lambda_powertools.utilities.data_classesimportS3BatchOperationEvent,S3BatchOperationResponse,event_sourcefromaws_lambda_powertools.utilities.typingimportLambdaContext@event_source(data_class=S3BatchOperationEvent)deflambda_handler(event:S3BatchOperationEvent,context:LambdaContext):response=S3BatchOperationResponse(event.invocation_schema_version,event.invocation_id,"PermanentFailure")task=event.tasksrc_key:str=task.s3_keysrc_bucket:str=task.s3_buckets3=boto3.client("s3",region_name="us-east-1")try:dest_bucket,dest_key=do_some_work(s3,src_bucket,src_key)result=task.build_task_batch_response("Succeeded",f"s3://{dest_bucket}/{dest_key}")exceptClientErrorase:error_code=e.response["Error"]["Code"]error_message=e.response["Error"]["Message"]iferror_code=="RequestTimeout":result=task.build_task_batch_response("TemporaryFailure","Retry request to Amazon S3 due to timeout.")else:result=task.build_task_batch_response("PermanentFailure",f"{error_code}: {error_message}")exceptExceptionase:result=task.build_task_batch_response("PermanentFailure",str(e))finally:response.add_result(result)returnresponse.asdict()defdo_some_work(s3_client,src_bucket:str,src_key:str):...
importboto3importrequestsfromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.logging.correlation_pathsimportS3_OBJECT_LAMBDAfromaws_lambda_powertools.utilities.data_classes.s3_object_eventimportS3ObjectLambdaEventlogger=Logger()session=boto3.session.Session()s3=session.client("s3")@logger.inject_lambda_context(correlation_id_path=S3_OBJECT_LAMBDA,log_event=True)deflambda_handler(event,context):event=S3ObjectLambdaEvent(event)# Get object from S3response=requests.get(event.input_s3_url)original_object=response.content.decode("utf-8")# Make changes to the object about to be returnedtransformed_object=original_object.upper()# Write object back to S3 Object Lambdas3.write_get_object_response(Body=transformed_object,RequestRoute=event.request_route,RequestToken=event.request_token)return{"status_code":200}
AWS Secrets Manager rotation uses an AWS Lambda function to update the secret. Click here for more information about rotating AWS Secrets Manager secrets.
1 2 3 4 5 6 7 8 910111213141516
fromaws_lambda_powertools.utilitiesimportparametersfromaws_lambda_powertools.utilities.data_classesimportSecretsManagerEvent,event_sourcesecrets_provider=parameters.SecretsProvider()@event_source(data_class=SecretsManagerEvent)deflambda_handler(event:SecretsManagerEvent,context):# Getting secret value using Parameter utility# See https://docs.powertools.aws.dev/lambda/python/latest/utilities/parameters/secret=secrets_provider.get(event.secret_id,VersionId=event.version_id,VersionStage="AWSCURRENT")# You need to work with secrets afterwards# Check more examples: https://github.com/aws-samples/aws-secrets-manager-rotation-lambdasreturnsecret
fromaws_lambda_powertools.utilities.data_classesimportevent_source,SESEvent@event_source(data_class=SESEvent)deflambda_handler(event:SESEvent,context):# Multiple records can be delivered in a single eventforrecordinevent.records:mail=record.ses.mailcommon_headers=mail.common_headersdo_something_with(common_headers.to,common_headers.subject)
fromaws_lambda_powertools.utilities.data_classesimportevent_source,SNSEvent@event_source(data_class=SNSEvent)deflambda_handler(event:SNSEvent,context):# Multiple records can be delivered in a single eventforrecordinevent.records:message=record.sns.messagesubject=record.sns.subjectdo_something_with(subject,message)
fromaws_lambda_powertools.utilities.data_classesimportevent_source,SQSEvent@event_source(data_class=SQSEvent)deflambda_handler(event:SQSEvent,context):# Multiple records can be delivered in a single eventforrecordinevent.records:do_something_with(record.body)
You can register your Lambda functions as targets within an Amazon VPC Lattice service network. By doing this, your Lambda function becomes a service within the network, and clients that have access to the VPC Lattice service network can call your service using Payload V2.
Click here for more information about using AWS Lambda with Amazon VPC Lattice.
{"version":"2.0","path":"/todos","method":"GET","headers":{"user_agent":"curl/7.64.1","x-forwarded-for":"10.213.229.10","host":"test-lambda-service-3908sdf9u3u.dkfjd93.vpc-lattice-svcs.us-east-2.on.aws","accept":"*/*"},"queryStringParameters":{"order-id":"1"},"body":"{\"message\": \"Hello from Lambda!\"}","requestContext":{"serviceNetworkArn":"arn:aws:vpc-lattice:us-east-2:123456789012:servicenetwork/sn-0bf3f2882e9cc805a","serviceArn":"arn:aws:vpc-lattice:us-east-2:123456789012:service/svc-0a40eebed65f8d69c","targetGroupArn":"arn:aws:vpc-lattice:us-east-2:123456789012:targetgroup/tg-6d0ecf831eec9f09","identity":{"sourceVpcArn":"arn:aws:ec2:region:123456789012:vpc/vpc-0b8276c84697e7339","type":"AWS_IAM","principal":"arn:aws:sts::123456789012:assumed-role/example-role/057d00f8b51257ba3c853a0f248943cf","sessionName":"057d00f8b51257ba3c853a0f248943cf","x509SanDns":"example.com"},"region":"us-east-2","timeEpoch":"1696331543569073"}}
You can register your Lambda functions as targets within an Amazon VPC Lattice service network. By doing this, your Lambda function becomes a service within the network, and clients that have access to the VPC Lattice service network can call your service.
Click here for more information about using AWS Lambda with Amazon VPC Lattice.
1 2 3 4 5 6 7 8 910111213141516171819
fromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.data_classesimportVPCLatticeEvent,event_sourcefromaws_lambda_powertools.utilities.typingimportLambdaContextlogger=Logger()@event_source(data_class=VPCLatticeEvent)deflambda_handler(event:VPCLatticeEvent,context:LambdaContext):logger.info(event.body)response={"isBase64Encoded":False,"statusCode":200,"headers":{"Content-Type":"application/text"},"body":"Event Response to VPC Lattice 🔥🚀🔥",}returnresponse
Alternatively, you can print out the fields to obtain more information. All classes come with a __str__ method that generates a dictionary string which can be quite useful for debugging.
However, certain events may contain sensitive fields such as secret_access_key and session_token, which are labeled as [SENSITIVE] to prevent any accidental disclosure of confidential information.
If we fail to deserialize a field value (e.g., JSON), they will appear as [Cannot be deserialized]
{"CodePipeline.job":{"id":"11111111-abcd-1111-abcd-111111abcdef","accountId":"111111111111","data":{"actionConfiguration":{"configuration":{"FunctionName":"MyLambdaFunctionForAWSCodePipeline","UserParameters":"some-input-such-as-a-URL"}},"inputArtifacts":[{"name":"ArtifactName","revision":null,"location":{"type":"S3","s3Location":{"bucketName":"the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890","objectKey":"the name of the application, for example CodePipelineDemoApplication.zip"}}}],"outputArtifacts":[],"artifactCredentials":{"accessKeyId":"AKIAIOSFODNN7EXAMPLE","secretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","sessionToken":"MIICiTCCAfICCQD6m7oRw0uXOjANBgkqhkiG9w0BAQUFADCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wHhcNMTEwNDI1MjA0NTIxWhcNMTIwNDI0MjA0NTIxWjCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMaK0dn+a4GmWIWJ21uUSfwfEvySWtC2XADZ4nB+BLYgVIk60CpiwsZ3G93vUEIO3IyNoH/f0wYK8m9TrDHudUZg3qX4waLG5M43q7Wgc/MbQITxOUSQv7c7ugFFDzQGBzZswY6786m86gpEIbb3OhjZnzcvQAaRHhdlQWIMm2nrAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAtCu4nUhVVxYUntneD9+h8Mg9q6q+auNKyExzyLwaxlAoo7TJHidbtS4J5iNmZgXL0FkbFFBjvSfpJIlJ00zbhNYS5f6GuoEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTbNYiytVbZPQUQ5Yaxu2jXnimvw3rrszlaEXAMPLE="},"continuationToken":"A continuation token if continuing job"}}}
{"account_id":"111111111111","data":{"action_configuration":{"configuration":{"decoded_user_parameters":"[Cannot be deserialized]","function_name":"MyLambdaFunctionForAWSCodePipeline","raw_event":"[SENSITIVE]","user_parameters":"some-input-such-as-a-URL"},"raw_event":"[SENSITIVE]"},"artifact_credentials":{"access_key_id":"AKIAIOSFODNN7EXAMPLE","expiration_time":"None","raw_event":"[SENSITIVE]","secret_access_key":"[SENSITIVE]","session_token":"[SENSITIVE]"},"continuation_token":"A continuation token if continuing job","encryption_key":"None","input_artifacts":[{"location":{"get_type":"S3","raw_event":"[SENSITIVE]","s3_location":{"bucket_name":"the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890","key":"the name of the application, for example CodePipelineDemoApplication.zip","object_key":"the name of the application, for example CodePipelineDemoApplication.zip","raw_event":"[SENSITIVE]"}},"name":"ArtifactName","raw_event":"[SENSITIVE]","revision":"None"}],"output_artifacts":[],"raw_event":"[SENSITIVE]"},"decoded_user_parameters":"[Cannot be deserialized]","get_id":"11111111-abcd-1111-abcd-111111abcdef","input_bucket_name":"the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890","input_object_key":"the name of the application, for example CodePipelineDemoApplication.zip","raw_event":"[SENSITIVE]","user_parameters":"some-input-such-as-a-URL"}