Type hinting and code completion for common event types
Helper functions for decoding/deserializing nested fields
Docstrings for fields contained in event schemas
Background
When authoring Lambda functions, you often need to understand the schema of the event dictionary which is passed to the
handler. There are several common event types which follow a specific schema, depending on the service triggering the
Lambda function.
The classes are initialized by passing in the Lambda event object into the constructor of the appropriate data class or
by using the event_source decorator.
For example, if your Lambda function is being triggered by an API Gateway proxy integration, you can use the
APIGatewayProxyEvent class.
The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of
documentation inherently (via autocompletion, types and docstrings).
fromtypingimportDictfromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.data_classesimportevent_sourcefromaws_lambda_powertools.utilities.data_classes.active_mq_eventimportActiveMQEventlogger=Logger()@event_source(data_class=ActiveMQEvent)deflambda_handler(event:ActiveMQEvent,context):formessageinevent.messages:logger.debug(f"MessageID: {message.message_id}")data:Dict=message.json_datalogger.debug("Process json in base64 encoded data str",data)
Use APIGatewayAuthorizerRequestEvent for type REQUEST and APIGatewayAuthorizerTokenEvent for type TOKEN.
This example uses the APIGatewayAuthorizerResponse to decline a given request if the user is not found.
When the user is found, it includes the user details in the request context that will be available to the back-end, and returns a full access policy for admin users.
fromaws_lambda_powertools.utilities.data_classesimportevent_sourcefromaws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_eventimport(DENY_ALL_RESPONSE,APIGatewayAuthorizerRequestEvent,APIGatewayAuthorizerResponse,HttpVerb,)fromsecretsimportcompare_digestdefget_user_by_token(token):ifcompare_digest(token,"admin-foo"):return{"id":0,"name":"Admin","isAdmin":True}elifcompare_digest(token,"regular-foo"):return{"id":1,"name":"Joe"}else:returnNone@event_source(data_class=APIGatewayAuthorizerRequestEvent)defhandler(event:APIGatewayAuthorizerRequestEvent,context):user=get_user_by_token(event.get_header_value("Authorization"))ifuserisNone:# No user was found# to return 401 - `{"message":"Unauthorized"}`, but pollutes lambda error count metrics# raise Exception("Unauthorized")# to return 403 - `{"message":"Forbidden"}`returnDENY_ALL_RESPONSE# parse the `methodArn` as an `APIGatewayRouteArn`arn=event.parsed_arn# Create the response builder from parts of the `methodArn`# and set the logged in user id and contextpolicy=APIGatewayAuthorizerResponse(principal_id=user["id"],context=user,region=arn.region,aws_account_id=arn.aws_account_id,api_id=arn.api_id,stage=arn.stage,)# Conditional IAM Policyifuser.get("isAdmin",False):policy.allow_all_routes()else:policy.allow_route(HttpVerb.GET,"/user-profile")returnpolicy.asdict()
This example looks up user details via x-token header. It uses APIGatewayAuthorizerResponseV2 to return a deny policy when user is not found or authorized.
1 2 3 4 5 6 7 8 9101112131415161718192021222324
fromaws_lambda_powertools.utilities.data_classesimportevent_sourcefromaws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_eventimport(APIGatewayAuthorizerEventV2,APIGatewayAuthorizerResponseV2,)fromsecretsimportcompare_digestdefget_user_by_token(token):ifcompare_digest(token,"Foo"):return{"name":"Foo"}returnNone@event_source(data_class=APIGatewayAuthorizerEventV2)defhandler(event:APIGatewayAuthorizerEventV2,context):user=get_user_by_token(event.get_header_value("x-token"))ifuserisNone:# No user was found, so we return not authorizedreturnAPIGatewayAuthorizerResponseV2().asdict()# Found the user and setting the details in the contextreturnAPIGatewayAuthorizerResponseV2(authorize=True,context=user).asdict()
In this example extract the requestId as the correlation_id for logging, used @event_source decorator and builds the AppSync authorizer using the AppSyncAuthorizerResponse helper.
fromtypingimportDictfromaws_lambda_powertools.loggingimportcorrelation_pathsfromaws_lambda_powertools.logging.loggerimportLoggerfromaws_lambda_powertools.utilities.data_classes.appsync_authorizer_eventimport(AppSyncAuthorizerEvent,AppSyncAuthorizerResponse,)fromaws_lambda_powertools.utilities.data_classes.event_sourceimportevent_sourcelogger=Logger()defget_user_by_token(token:str):"""Look a user by token"""...@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER)@event_source(data_class=AppSyncAuthorizerEvent)deflambda_handler(event:AppSyncAuthorizerEvent,context)->Dict:user=get_user_by_token(event.authorization_token)ifnotuser:# No user found, return not authorizedreturnAppSyncAuthorizerResponse().asdict()returnAppSyncAuthorizerResponse(authorize=True,resolver_context={"id":user.id},# Only allow admins to delete eventsdeny_fields=Noneifuser.is_adminelse["Mutation.deleteEvent"],).asdict()
In this example, we also use the new Logger correlation_id and built-in correlation_paths to extract, if available, X-Ray Trace ID in AppSync request headers:
fromaws_lambda_powertools.loggingimportLogger,correlation_pathsfromaws_lambda_powertools.utilities.data_classes.appsync_resolver_eventimport(AppSyncResolverEvent,AppSyncIdentityCognito)logger=Logger()defget_locations(name:str=None,size:int=0,page:int=0):"""Your resolver logic here"""@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)deflambda_handler(event,context):event:AppSyncResolverEvent=AppSyncResolverEvent(event)# Case insensitive look up of request headersx_forwarded_for=event.get_header_value("x-forwarded-for")# Support for AppSyncIdentityCognito or AppSyncIdentityIAM identity typesassertisinstance(event.identity,AppSyncIdentityCognito)identity:AppSyncIdentityCognito=event.identity# Logging with correlation_idlogger.debug({"x-forwarded-for":x_forwarded_for,"username":identity.username})ifevent.type_name=="Merchant"andevent.field_name=="locations":returnget_locations(**event.arguments)raiseValueError(f"Unsupported field resolver: {event.field_name}")
fromaws_lambda_powertools.utilities.data_classesimportevent_source,CloudWatchDashboardCustomWidgetEventconstDOCS=`## EchoAsimpleechoscript.Anythingpassedin \`\`\`echo\`\`\`parameterisreturnedasthecontentofcustomwidget.### Widget parametersParam|Description---|---**echo**|Thecontenttoechoback### Example parameters
\`\`\`yamlecho:<h1>Helloworld</h1>
\`\`\``@event_source(data_class=CloudWatchDashboardCustomWidgetEvent)deflambda_handler(event:CloudWatchDashboardCustomWidgetEvent,context):ifevent.describe:returnDOCS# You can directly return HTML or JSON content# Alternatively, you can return markdown that will be rendered by CloudWatchecho=event.widget_context.params["echo"]return{"markdown":f"# {echo}"}
CloudWatch Logs events by default are compressed and base64 encoded. You can use the helper function provided to decode,
decompress and parse json data from the event.
fromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.data_classesimportevent_source,CodePipelineJobEventlogger=Logger()@event_source(data_class=CodePipelineJobEvent)deflambda_handler(event,context):"""The Lambda function handler If a continuing job then checks the CloudFormation stack status and updates the job accordingly. If a new job then kick of an update or creation of the target CloudFormation stack. """# Extract the Job IDjob_id=event.get_id# Extract the paramsparams:dict=event.decoded_user_parametersstack=params["stack"]artifact_name=params["artifact"]template_file=params["file"]try:ifevent.data.continuation_token:# If we're continuing then the create/update has already been triggered# we just need to check if it has finished.check_stack_update_status(job_id,stack)else:template=event.get_artifact(artifact_name,template_file)# Kick off a stack update or createstart_update_or_create(job_id,stack,template)exceptExceptionase:# If any other exceptions which we didn't expect are raised# then fail the job and log the exception message.logger.exception("Function failed due to exception.")put_job_failure(job_id,"Function exception: "+str(e))logger.debug("Function complete.")return"Complete."
Cognito User Pools have several different Lambda trigger sources, all of which map to a different data class, which
can be imported from aws_lambda_powertools.data_classes.cognito_user_pool_event:
The DynamoDB data class utility provides the base class for DynamoDBStreamEvent, a typed class for
attributes values (AttributeValue), as well as enums for stream view type (StreamViewType) and event type
(DynamoDBRecordEventName).
1 2 3 4 5 6 7 8 910111213
fromaws_lambda_powertools.utilities.data_classes.dynamo_db_stream_eventimport(DynamoDBStreamEvent,DynamoDBRecordEventName)deflambda_handler(event,context):event:DynamoDBStreamEvent=DynamoDBStreamEvent(event)# Multiple records can be delivered in a single eventforrecordinevent.records:ifrecord.event_name==DynamoDBRecordEventName.MODIFY:do_something_with(record.dynamodb.new_image)do_something_with(record.dynamodb.old_image)
Kinesis events by default contain base64 encoded data. You can use the helper function to access the data either as json
or plain text, depending on the original payload.
1 2 3 4 5 6 7 8 910111213
fromaws_lambda_powertools.utilities.data_classesimportevent_source,KinesisStreamEvent@event_source(data_class=KinesisStreamEvent)deflambda_handler(event:KinesisStreamEvent,context):kinesis_record=next(event.records).kinesis# if data was delivered as textdata=kinesis_record.data_as_text()# if data was delivered as jsondata=kinesis_record.data_as_json()do_something_with(data)
fromtypingimportDictfromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.data_classesimportevent_sourcefromaws_lambda_powertools.utilities.data_classes.rabbit_mq_eventimportRabbitMQEventlogger=Logger()@event_source(data_class=RabbitMQEvent)deflambda_handler(event:RabbitMQEvent,context):forqueue_name,messagesinevent.rmq_messages_by_queue.items():logger.debug(f"Messages for queue: {queue_name}")formessageinmessages:logger.debug(f"MessageID: {message.basic_properties.message_id}")data:Dict=message.json_datalogger.debug("Process json in base64 encoded data str",data)
fromurllib.parseimportunquote_plusfromaws_lambda_powertools.utilities.data_classesimportevent_source,S3Event@event_source(data_class=S3Event)deflambda_handler(event:S3Event,context):bucket_name=event.bucket_name# Multiple records can be delivered in a single eventforrecordinevent.records:object_key=unquote_plus(record.s3.get_object.key)do_something_with(f"{bucket_name}/{object_key}")
importboto3importrequestsfromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.logging.correlation_pathsimportS3_OBJECT_LAMBDAfromaws_lambda_powertools.utilities.data_classes.s3_object_eventimportS3ObjectLambdaEventlogger=Logger()session=boto3.Session()s3=session.client("s3")@logger.inject_lambda_context(correlation_id_path=S3_OBJECT_LAMBDA,log_event=True)deflambda_handler(event,context):event=S3ObjectLambdaEvent(event)# Get object from S3response=requests.get(event.input_s3_url)original_object=response.content.decode("utf-8")# Make changes to the object about to be returnedtransformed_object=original_object.upper()# Write object back to S3 Object Lambdas3.write_get_object_response(Body=transformed_object,RequestRoute=event.request_route,RequestToken=event.request_token)return{"status_code":200}
fromaws_lambda_powertools.utilities.data_classesimportevent_source,SESEvent@event_source(data_class=SESEvent)deflambda_handler(event:SESEvent,context):# Multiple records can be delivered in a single eventforrecordinevent.records:mail=record.ses.mailcommon_headers=mail.common_headersdo_something_with(common_headers.to,common_headers.subject)
fromaws_lambda_powertools.utilities.data_classesimportevent_source,SNSEvent@event_source(data_class=SNSEvent)deflambda_handler(event:SNSEvent,context):# Multiple records can be delivered in a single eventforrecordinevent.records:message=record.sns.messagesubject=record.sns.subjectdo_something_with(subject,message)
fromaws_lambda_powertools.utilities.data_classesimportevent_source,SQSEvent@event_source(data_class=SQSEvent)deflambda_handler(event:SQSEvent,context):# Multiple records can be delivered in a single eventforrecordinevent.records:do_something_with(record.body)