Direct Lambda Resolver. A custom AppSync Resolver to bypass the use of Apache Velocity Template (VTL) and automatically map your function's response to a GraphQL field.
Amplify GraphQL Transformer. Custom GraphQL directives to define your application's data model using Schema Definition Language (SDL), e.g., @function. Amplify CLI uses these directives to convert GraphQL SDL into full descriptive AWS CloudFormation templates.
You must have an existing AppSync GraphQL API and IAM permissions to invoke your Lambda function. That said, there is no additional permissions to use Event Handler as routing requires no dependency (standard library).
This is the sample infrastructure we are using for the initial examples with a AppSync Direct Lambda Resolver.
AWSTemplateFormatVersion:"2010-09-09"Transform:AWS::Serverless-2016-10-31Description:Hello world Direct Lambda ResolverGlobals:Function:Timeout:5Runtime:python3.12Tracing:ActiveEnvironment:Variables:# Powertools for AWS Lambda (Python) env vars: https://docs.powertools.aws.dev/lambda/python/latest/#environment-variablesPOWERTOOLS_LOG_LEVEL:INFOPOWERTOOLS_LOGGER_SAMPLE_RATE:0.1POWERTOOLS_LOGGER_LOG_EVENT:truePOWERTOOLS_SERVICE_NAME:exampleResources:TodosFunction:Type:AWS::Serverless::FunctionProperties:Handler:getting_started_graphql_api_resolver.lambda_handlerCodeUri:../srcDescription:Sample Direct Lambda Resolver# IAM Permissions and RolesAppSyncServiceRole:Type:"AWS::IAM::Role"Properties:AssumeRolePolicyDocument:Version:"2012-10-17"Statement:-Effect:"Allow"Principal:Service:-"appsync.amazonaws.com"Action:-"sts:AssumeRole"InvokeLambdaResolverPolicy:Type:"AWS::IAM::Policy"Properties:PolicyName:"DirectAppSyncLambda"PolicyDocument:Version:"2012-10-17"Statement:-Effect:"Allow"Action:"lambda:invokeFunction"Resource:-!GetAttTodosFunction.ArnRoles:-!RefAppSyncServiceRole# GraphQL APITodosApi:Type:"AWS::AppSync::GraphQLApi"Properties:Name:TodosApiAuthenticationType:"API_KEY"XrayEnabled:trueTodosApiKey:Type:AWS::AppSync::ApiKeyProperties:ApiId:!GetAttTodosApi.ApiIdTodosApiSchema:Type:"AWS::AppSync::GraphQLSchema"Properties:ApiId:!GetAttTodosApi.ApiIdDefinitionS3Location:../src/getting_started_schema.graphqlMetadata:cfn-lint:config:ignore_checks:-W3002# allow relative path in DefinitionS3Location# Lambda Direct Data Source and ResolverTodosFunctionDataSource:Type:"AWS::AppSync::DataSource"Properties:ApiId:!GetAttTodosApi.ApiIdName:"HelloWorldLambdaDirectResolver"Type:"AWS_LAMBDA"ServiceRoleArn:!GetAttAppSyncServiceRole.ArnLambdaConfig:LambdaFunctionArn:!GetAttTodosFunction.ArnListTodosResolver:Type:"AWS::AppSync::Resolver"Properties:ApiId:!GetAttTodosApi.ApiIdTypeName:"Query"FieldName:"listTodos"DataSourceName:!GetAttTodosFunctionDataSource.NameGetTodoResolver:Type:"AWS::AppSync::Resolver"Properties:ApiId:!GetAttTodosApi.ApiIdTypeName:"Query"FieldName:"getTodo"DataSourceName:!GetAttTodosFunctionDataSource.NameCreateTodoResolver:Type:"AWS::AppSync::Resolver"Properties:ApiId:!GetAttTodosApi.ApiIdTypeName:"Mutation"FieldName:"createTodo"DataSourceName:!GetAttTodosFunctionDataSource.NameOutputs:TodosFunction:Description:"HelloWorldLambdaFunctionARN"Value:!GetAttTodosFunction.ArnTodosApi:Value:!GetAttTodosApi.GraphQLUrl
You can define your functions to match GraphQL types and fields with the app.resolver() decorator.
What is a type and field?
A type would be a top-level GraphQL Type like Query, Mutation, Todo. A GraphQL Field would be listTodos under Query, createTodo under Mutation, etc.
Here's an example with two separate functions to resolve getTodo and listTodos fields within the Query type. For completion, we use Scalar type utilities to generate the right output based on our schema definition.
Important
GraphQL arguments are passed as function keyword arguments.
Example
The GraphQL Query getTodo(id: "todo_id_value") will
call get_todo as get_todo(id="todo_id_value").
fromtypingimportListimportrequestsfromrequestsimportResponsefromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.event_handlerimportAppSyncResolverfromaws_lambda_powertools.loggingimportcorrelation_pathsfromaws_lambda_powertools.shared.typesimportTypedDictfromaws_lambda_powertools.utilities.data_classes.appsyncimportscalar_types_utilsfromaws_lambda_powertools.utilities.typingimportLambdaContexttracer=Tracer()logger=Logger()app=AppSyncResolver()classTodo(TypedDict,total=False):id:str# noqa AA03 VNE003, required due to GraphQL SchemauserId:strtitle:strcompleted:bool@app.resolver(type_name="Query",field_name="getTodo")@tracer.capture_methoddefget_todo(id:str="",# noqa AA03 VNE003 shadows built-in id to match query argument, e.g., getTodo(id: "some_id"))->Todo:logger.info(f"Fetching Todo {id}")todos:Response=requests.get(f"https://jsonplaceholder.typicode.com/todos/{id}")todos.raise_for_status()returntodos.json()@app.resolver(type_name="Query",field_name="listTodos")@tracer.capture_methoddeflist_todos()->List[Todo]:todos:Response=requests.get("https://jsonplaceholder.typicode.com/todos")todos.raise_for_status()# for brevity, we'll limit to the first 10 onlyreturntodos.json()[:10]@app.resolver(type_name="Mutation",field_name="createTodo")@tracer.capture_methoddefcreate_todo(title:str)->Todo:payload={"userId":scalar_types_utils.make_id(),"title":title,"completed":False}# dummy UUID strtodo:Response=requests.post("https://jsonplaceholder.typicode.com/todos",json=payload)todo.raise_for_status()returntodo.json()@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)@tracer.capture_lambda_handlerdeflambda_handler(event:dict,context:LambdaContext)->dict:returnapp.resolve(event,context)
fromtypingimportListfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.event_handlerimportAppSyncResolverfromaws_lambda_powertools.loggingimportcorrelation_pathsfromaws_lambda_powertools.shared.typesimportTypedDictfromaws_lambda_powertools.utilities.typingimportLambdaContexttracer=Tracer()logger=Logger()app=AppSyncResolver()classLocation(TypedDict,total=False):id:str# noqa AA03 VNE003, required due to GraphQL Schemaname:strdescription:straddress:str@app.resolver(field_name="listLocations")@app.resolver(field_name="locations")@tracer.capture_methoddefget_locations(name:str,description:str="")->List[Location]:# match GraphQL Query argumentsreturn[{"name":name,"description":description}]@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)@tracer.capture_lambda_handlerdeflambda_handler(event:dict,context:LambdaContext)->dict:returnapp.resolve(event,context)
importasynciofromtypingimportListimportaiohttpfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.event_handlerimportAppSyncResolverfromaws_lambda_powertools.loggingimportcorrelation_pathsfromaws_lambda_powertools.shared.typesimportTypedDictfromaws_lambda_powertools.tracingimportaiohttp_trace_configfromaws_lambda_powertools.utilities.typingimportLambdaContexttracer=Tracer()logger=Logger()app=AppSyncResolver()classTodo(TypedDict,total=False):id:str# noqa AA03 VNE003, required due to GraphQL SchemauserId:strtitle:strcompleted:bool@app.resolver(type_name="Query",field_name="listTodos")asyncdeflist_todos()->List[Todo]:asyncwithaiohttp.ClientSession(trace_configs=[aiohttp_trace_config()])assession:asyncwithsession.get("https://jsonplaceholder.typicode.com/todos")asresp:returnawaitresp.json()@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)@tracer.capture_lambda_handlerdeflambda_handler(event:dict,context:LambdaContext)->dict:result=app.resolve(event,context)returnasyncio.run(result)
Amplify CLI generated functions use Pipenv as a dependency manager. Your function source code is located at amplify/backend/function/your-function-name.
Within your function's folder, add Powertools for AWS Lambda (Python) as a dependency with pipenv install aws-lambda-powertools.
Use the following code for merchantInfo and searchMerchant functions respectively.
fromtypingimportListfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.event_handlerimportAppSyncResolverfromaws_lambda_powertools.loggingimportcorrelation_pathsfromaws_lambda_powertools.shared.typesimportTypedDictfromaws_lambda_powertools.utilities.data_classes.appsyncimportscalar_types_utilsfromaws_lambda_powertools.utilities.typingimportLambdaContexttracer=Tracer()logger=Logger()app=AppSyncResolver()classLocation(TypedDict,total=False):id:str# noqa AA03 VNE003, required due to GraphQL Schemaname:strdescription:straddress:strcommonField:str@app.resolver(type_name="Query",field_name="listLocations")deflist_locations(page:int=0,size:int=10)->List[Location]:return[{"id":scalar_types_utils.make_id(),"name":"Smooth Grooves"}]@app.resolver(field_name="commonField")defcommon_field()->str:# Would match all fieldNames matching 'commonField'returnscalar_types_utils.make_id()@tracer.capture_lambda_handler@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)deflambda_handler(event:dict,context:LambdaContext)->dict:returnapp.resolve(event,context)
fromtypingimportListfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.event_handlerimportAppSyncResolverfromaws_lambda_powertools.loggingimportcorrelation_pathsfromaws_lambda_powertools.shared.typesimportTypedDictfromaws_lambda_powertools.utilities.data_classes.appsyncimportscalar_types_utilsfromaws_lambda_powertools.utilities.data_classes.appsync_resolver_eventimport(AppSyncResolverEvent,)fromaws_lambda_powertools.utilities.typingimportLambdaContexttracer=Tracer()logger=Logger()app=AppSyncResolver()classLocation(TypedDict,total=False):id:str# noqa AA03 VNE003, required due to GraphQL Schemaname:strdescription:straddress:strcommonField:strclassMyCustomModel(AppSyncResolverEvent):@propertydefcountry_viewer(self)->str:returnself.get_header_value(name="cloudfront-viewer-country",default_value="",case_sensitive=False)@propertydefapi_key(self)->str:returnself.get_header_value(name="x-api-key",default_value="",case_sensitive=False)@app.resolver(type_name="Query",field_name="listLocations")deflist_locations(page:int=0,size:int=10)->List[Location]:# additional properties/methods will now be available under current_eventifapp.current_event:logger.debug(f"Request country origin: {app.current_event.country_viewer}")# type: ignore[attr-defined]return[{"id":scalar_types_utils.make_id(),"name":"Perry, James and Carroll"}]@tracer.capture_lambda_handler@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)deflambda_handler(event:dict,context:LambdaContext)->dict:returnapp.resolve(event,context,data_model=MyCustomModel)
As you grow the number of related GraphQL operations a given Lambda function should handle, it is natural to split them into separate files to ease maintenance - That's when the Router feature comes handy.
Let's assume you have split_operation.py as your Lambda function entrypoint and routes in split_operation_module.py. This is how you'd use the Router feature.
We import Router instead of AppSyncResolver; syntax wise is exactly the same.
1 2 3 4 5 6 7 8 91011121314151617181920212223
fromtypingimportListfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.event_handler.graphql_appsync.routerimportRouterfromaws_lambda_powertools.shared.typesimportTypedDicttracer=Tracer()logger=Logger()router=Router()classLocation(TypedDict,total=False):id:str# noqa AA03 VNE003, required due to GraphQL Schemaname:strdescription:straddress:str@router.resolver(field_name="listLocations")@router.resolver(field_name="locations")@tracer.capture_methoddefget_locations(name:str,description:str="")->List[Location]:# match GraphQL Query argumentsreturn[{"name":name,"description":description}]
We use include_router method and include all location operations registered in the router global object.
You can use append_context when you want to share data between your App and Router instances. Any data you share will be available via the context dictionary available in your App or Router context.
Info
For safety, we always clear any data available in the context dictionary after each invocation.
Tip
This can also be useful for middlewares injecting contextual information before a request is processed.
1 2 3 4 5 6 7 8 9101112131415161718
importsplit_operation_append_context_modulefromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.event_handlerimportAppSyncResolverfromaws_lambda_powertools.loggingimportcorrelation_pathsfromaws_lambda_powertools.utilities.typingimportLambdaContexttracer=Tracer()logger=Logger()app=AppSyncResolver()app.include_router(split_operation_append_context_module.router)@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)@tracer.capture_lambda_handlerdeflambda_handler(event:dict,context:LambdaContext)->dict:app.append_context(is_admin=True)# arbitrary number of key=value datareturnapp.resolve(event,context)
1 2 3 4 5 6 7 8 9101112131415161718192021222324
fromtypingimportListfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.event_handler.graphql_appsync.routerimportRouterfromaws_lambda_powertools.shared.typesimportTypedDicttracer=Tracer()logger=Logger()router=Router()classLocation(TypedDict,total=False):id:str# noqa AA03 VNE003, required due to GraphQL Schemaname:strdescription:straddress:str@router.resolver(field_name="listLocations")@router.resolver(field_name="locations")@tracer.capture_methoddefget_locations(name:str,description:str="")->List[Location]:# match GraphQL Query argumentsis_admin:bool=router.context.get("is_admin",False)return[{"name":name,"description":description}]ifis_adminelse[]
from__future__importannotationsfromtypingimportAnyfromaws_lambda_powertools.event_handlerimportAppSyncResolverfromaws_lambda_powertools.utilities.data_classesimportAppSyncResolverEventfromaws_lambda_powertools.utilities.typingimportLambdaContextapp=AppSyncResolver()# mimic DB data for simplicityposts_related={"1":{"title":"post1"},"2":{"title":"post2"},"3":{"title":"post3"},}defsearch_batch_posts(posts:list)->dict[str,Any]:return{post_id:posts_related.get(post_id)forpost_idinposts}@app.batch_resolver(type_name="Query",field_name="relatedPosts")defrelated_posts(event:list[AppSyncResolverEvent])->list[Any]:# (1)!# Extract all post_ids in orderpost_ids:list=[record.source.get("post_id")forrecordinevent]# (2)!# Get unique post_ids while preserving orderunique_post_ids=list(dict.fromkeys(post_ids))# Fetch posts in a single batch operationfetched_posts=search_batch_posts(unique_post_ids)# Return results in original orderreturn[fetched_posts.get(post_id)forpost_idinpost_ids]deflambda_handler(event,context:LambdaContext)->dict:returnapp.resolve(event,context)
The entire batch is sent to the resolver. You need to iterate through it to process all records.
We use post_id as our unique identifier of the GraphQL request.
from__future__importannotationsfromtypingimportAnyfromaws_lambda_powertools.event_handlerimportAppSyncResolverfromaws_lambda_powertools.utilities.data_classesimportAppSyncResolverEventfromaws_lambda_powertools.utilities.typingimportLambdaContextapp=AppSyncResolver()# mimic DB data for simplicityposts_related={"1":{"title":"post1"},"2":{"title":"post2"},"3":{"title":"post3"},}asyncdefsearch_batch_posts(posts:list)->dict[str,Any]:return{post_id:posts_related.get(post_id)forpost_idinposts}@app.async_batch_resolver(type_name="Query",field_name="relatedPosts")asyncdefrelated_posts(event:list[AppSyncResolverEvent])->list[Any]:# Extract all post_ids in orderpost_ids:list=[record.source.get("post_id")forrecordinevent]# Get unique post_ids while preserving orderunique_post_ids=list(dict.fromkeys(post_ids))# Fetch posts in a single batch operationfetched_posts=awaitsearch_batch_posts(unique_post_ids)# Return results in original orderreturn[fetched_posts.get(post_id)forpost_idinpost_ids]deflambda_handler(event,context:LambdaContext)->dict:returnapp.resolve(event,context)# (1)!
async_batch_resolver takes care of running and waiting for coroutine completion.
fromtypingimportListfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.event_handlerimportAppSyncResolverfromaws_lambda_powertools.loggingimportcorrelation_pathsfromaws_lambda_powertools.shared.typesimportTypedDictfromaws_lambda_powertools.utilities.typingimportLambdaContexttracer=Tracer()logger=Logger()app=AppSyncResolver()classLocation(TypedDict,total=False):id:str# noqa AA03 VNE003, required due to GraphQL Schemaname:strdescription:straddress:str@app.resolver(field_name="listLocations")@app.resolver(field_name="locations")@tracer.capture_methoddefget_locations(name:str,description:str="")->List[Location]:# match GraphQL Query argumentsreturn[{"name":name,"description":description}]@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)@tracer.capture_lambda_handlerdeflambda_handler(event:dict,context:LambdaContext)->dict:returnapp.resolve(event,context)
{"typeName":"Query","fieldName":"listLocations","arguments":{"name":"Perkins-Reed","description":"Nulla sed amet. Earum libero qui sunt perspiciatis. Non aliquid accusamus."},"selectionSetList":["id","name"],"identity":{"claims":{"sub":"192879fc-a240-4bf1-ab5a-d6a00f3063f9","email_verified":true,"iss":"https://cognito-idp.us-west-2.amazonaws.com/us-west-xxxxxxxxxxx","phone_number_verified":false,"cognito:username":"jdoe","aud":"7471s60os7h0uu77i1tk27sp9n","event_id":"bc334ed8-a938-4474-b644-9547e304e606","token_use":"id","auth_time":1599154213,"phone_number":"+19999999999","exp":1599157813,"iat":1599154213,"email":"jdoe@email.com"},"defaultAuthStrategy":"ALLOW","groups":null,"issuer":"https://cognito-idp.us-west-2.amazonaws.com/us-west-xxxxxxxxxxx","sourceIp":["1.1.1.1"],"sub":"192879fc-a240-4bf1-ab5a-d6a00f3063f9","username":"jdoe"},"request":{"headers":{"x-amzn-trace-id":"Root=1-60488877-0b0c4e6727ab2a1c545babd0","x-forwarded-for":"127.0.0.1","cloudfront-viewer-country":"NL","x-api-key":"da1-c33ullkbkze3jg5hf5ddgcs4fq"}}}
And an example for testing asynchronous resolvers. Note that this requires the pytest-asyncio package. This tests a specific async GraphQL operation.
Note
Alternatively, you can continue call lambda_handler function synchronously as it'd run asyncio.run to await for the coroutine to complete.
importjsonfromdataclassesimportdataclassfrompathlibimportPathfromtypingimportListimportpytestfromassert_async_graphql_response_moduleimport(# instance of AppSyncResolverTodo,app,)@pytest.fixturedeflambda_context():@dataclassclassLambdaContext:function_name:str="test"memory_limit_in_mb:int=128invoked_function_arn:str="arn:aws:lambda:eu-west-1:123456789012:function:test"aws_request_id:str="da658bd3-2d6f-4e7b-8ec2-937234644fdc"returnLambdaContext()@pytest.mark.asyncioasyncdeftest_async_direct_resolver(lambda_context):# GIVENfake_event=json.loads(Path("assert_async_graphql_response.json").read_text())# WHENresult:List[Todo]=awaitapp(fake_event,lambda_context)# alternatively, you can also run a sync test against `lambda_handler`# since `lambda_handler` awaits the coroutine to complete# THENassertresult[0]["userId"]==1assertresult[0]["id"]==1assertresult[0]["completed"]isFalse
importasynciofromtypingimportListimportaiohttpfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.event_handlerimportAppSyncResolverfromaws_lambda_powertools.loggingimportcorrelation_pathsfromaws_lambda_powertools.shared.typesimportTypedDictfromaws_lambda_powertools.tracingimportaiohttp_trace_configfromaws_lambda_powertools.utilities.typingimportLambdaContexttracer=Tracer()logger=Logger()app=AppSyncResolver()classTodo(TypedDict,total=False):id:str# noqa AA03 VNE003, required due to GraphQL SchemauserId:strtitle:strcompleted:bool@app.resolver(type_name="Query",field_name="listTodos")asyncdeflist_todos()->List[Todo]:asyncwithaiohttp.ClientSession(trace_configs=[aiohttp_trace_config()])assession:asyncwithsession.get("https://jsonplaceholder.typicode.com/todos")asresp:result:List[Todo]=awaitresp.json()returnresult[:2]# first two results to demo assertion@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)@tracer.capture_lambda_handlerdeflambda_handler(event:dict,context:LambdaContext)->dict:result=app.resolve(event,context)returnasyncio.run(result)