Module aws_lambda_powertools.event_handler.appsync
Classes
class AppSyncResolver
-
AppSync GraphQL API Resolver
Example
from aws_lambda_powertools.event_handler import AppSyncResolver app = AppSyncResolver() @app.resolver(type_name="Query", field_name="listLocations") def list_locations(page: int = 0, size: int = 10) -> list: # Your logic to fetch locations with arguments passed in return [{"id": 100, "name": "Smooth Grooves"}] @app.resolver(type_name="Merchant", field_name="extraInfo") def get_extra_info() -> dict: # Can use "app.current_event.source" to filter within the parent context account_type = app.current_event.source["accountType"] method = "BTC" if account_type == "NEW" else "USD" return {"preferredPaymentMethod": method} @app.resolver(field_name="commonField") def common_field() -> str: # Would match all fieldNames matching 'commonField' return str(uuid.uuid4())
Initialize a new instance of the AppSyncResolver.
Expand source code
class AppSyncResolver(Router): """ AppSync GraphQL API Resolver Example ------- ```python from aws_lambda_powertools.event_handler import AppSyncResolver app = AppSyncResolver() @app.resolver(type_name="Query", field_name="listLocations") def list_locations(page: int = 0, size: int = 10) -> list: # Your logic to fetch locations with arguments passed in return [{"id": 100, "name": "Smooth Grooves"}] @app.resolver(type_name="Merchant", field_name="extraInfo") def get_extra_info() -> dict: # Can use "app.current_event.source" to filter within the parent context account_type = app.current_event.source["accountType"] method = "BTC" if account_type == "NEW" else "USD" return {"preferredPaymentMethod": method} @app.resolver(field_name="commonField") def common_field() -> str: # Would match all fieldNames matching 'commonField' return str(uuid.uuid4()) ``` """ def __init__(self): """ Initialize a new instance of the AppSyncResolver. """ super().__init__() self.context = {} # early init as customers might add context before event resolution def __call__( self, event: dict, context: LambdaContext, data_model: Type[AppSyncResolverEvent] = AppSyncResolverEvent, ) -> Any: """Implicit lambda handler which internally calls `resolve`""" return self.resolve(event, context, data_model) def resolve( self, event: Union[dict, List[Dict]], context: LambdaContext, data_model: Type[AppSyncResolverEvent] = AppSyncResolverEvent, ) -> Any: """Resolves the response based on the provide event and decorator routes Parameters ---------- event : dict | List[Dict] Lambda event either coming from batch processing endpoint or from standard processing endpoint context : LambdaContext Lambda context data_model: Your data data_model to decode AppSync event, by default AppSyncResolverEvent Example ------- ```python from aws_lambda_powertools.event_handler import AppSyncResolver from aws_lambda_powertools.utilities.typing import LambdaContext @app.resolver(field_name="createSomething") def create_something(id: str): # noqa AA03 VNE003 return id def handler(event, context: LambdaContext): return app.resolve(event, context) ``` **Bringing custom models** ```python from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.logging import correlation_paths from aws_lambda_powertools.event_handler import AppSyncResolver tracer = Tracer(service="sample_resolver") logger = Logger(service="sample_resolver") app = AppSyncResolver() class MyCustomModel(AppSyncResolverEvent): @property def country_viewer(self) -> str: return self.request_headers.get("cloudfront-viewer-country") @app.resolver(field_name="listLocations") @app.resolver(field_name="locations") def get_locations(name: str, description: str = ""): if app.current_event.country_viewer == "US": ... return name + description @logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER) @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context, data_model=MyCustomModel) ``` Returns ------- Any Returns the result of the resolver Raises ------- ValueError If we could not find a field resolver """ self.lambda_context = context Router.lambda_context = context if isinstance(event, list): Router.current_batch_event = [data_model(e) for e in event] response = self._call_batch_resolver(event=event, data_model=data_model) else: Router.current_event = data_model(event) response = self._call_single_resolver(event=event, data_model=data_model) self.clear_context() return response def _call_single_resolver(self, event: dict, data_model: Type[AppSyncResolverEvent]) -> Any: """Call single event resolver Parameters ---------- event : dict Event data_model : Type[AppSyncResolverEvent] Data_model to decode AppSync event, by default it is of AppSyncResolverEvent type or subclass of it """ logger.debug("Processing direct resolver event") self.current_event = data_model(event) resolver = self._resolver_registry.find_resolver(self.current_event.type_name, self.current_event.field_name) if not resolver: raise ValueError(f"No resolver found for '{self.current_event.type_name}.{self.current_event.field_name}'") return resolver["func"](**self.current_event.arguments) def _call_sync_batch_resolver( self, resolver: Callable, raise_on_error: bool = False, aggregate: bool = True, ) -> List[Any]: """ Calls a synchronous batch resolver function for each event in the current batch. Parameters ---------- resolver: Callable The callable function to resolve events. raise_on_error: bool A flag indicating whether to raise an error when processing batches with failed items. Defaults to False, which means errors are handled without raising exceptions. aggregate: bool A flag indicating whether the batch items should be processed at once or individually. If True (default), the batch resolver will process all items in the batch as a single event. If False, the batch resolver will process each item in the batch individually. Returns ------- List[Any] A list of results corresponding to the resolved events. """ logger.debug(f"Graceful error handling flag {raise_on_error=}") # Checks whether the entire batch should be processed at once if aggregate: # Process the entire batch response = resolver(event=self.current_batch_event) if not isinstance(response, List): raise InvalidBatchResponse("The response must be a List when using batch resolvers") return response # Non aggregated events, so we call this event list x times # Stop on first exception we encounter if raise_on_error: return [ resolver(event=appconfig_event, **appconfig_event.arguments) for appconfig_event in self.current_batch_event ] # By default, we gracefully append `None` for any records that failed processing results = [] for idx, event in enumerate(self.current_batch_event): try: results.append(resolver(event=event, **event.arguments)) except Exception: logger.debug(f"Failed to process event number {idx} from field '{event.info.field_name}'") results.append(None) return results async def _call_async_batch_resolver( self, resolver: Callable, raise_on_error: bool = False, aggregate: bool = True, ) -> List[Any]: """ Asynchronously call a batch resolver for each event in the current batch. Parameters ---------- resolver: Callable The asynchronous resolver function. raise_on_error: bool A flag indicating whether to raise an error when processing batches with failed items. Defaults to False, which means errors are handled without raising exceptions. aggregate: bool A flag indicating whether the batch items should be processed at once or individually. If True (default), the batch resolver will process all items in the batch as a single event. If False, the batch resolver will process each item in the batch individually. Returns ------- List[Any] A list of results corresponding to the resolved events. """ logger.debug(f"Graceful error handling flag {raise_on_error=}") # Checks whether the entire batch should be processed at once if aggregate: # Process the entire batch ret = await resolver(event=self.current_batch_event) if not isinstance(ret, List): raise InvalidBatchResponse("The response must be a List when using batch resolvers") return ret response: List = [] # Prime coroutines tasks = [resolver(event=e, **e.arguments) for e in self.current_batch_event] # Aggregate results or raise at first error if raise_on_error: response.extend(await asyncio.gather(*tasks)) return response # Aggregate results and exceptions, then filter them out # Use `None` upon exception for graceful error handling at GraphQL engine level # # NOTE: asyncio.gather(return_exceptions=True) catches and includes exceptions in the results # this will become useful when we support exception handling in AppSync resolver results = await asyncio.gather(*tasks, return_exceptions=True) response.extend(None if isinstance(ret, Exception) else ret for ret in results) return response def _call_batch_resolver(self, event: List[dict], data_model: Type[AppSyncResolverEvent]) -> List[Any]: """Call batch event resolver for sync and async methods Parameters ---------- event : List[dict] Batch event data_model : Type[AppSyncResolverEvent] Data_model to decode AppSync event, by default AppSyncResolverEvent or a subclass Returns ------- List[Any] Results of the resolver execution. Raises ------ InconsistentPayloadError: When all events in the batch do not have the same fieldName. ResolverNotFoundError: When no resolver is found for the specified type and field. """ logger.debug("Processing batch resolver event") self.current_batch_event = [data_model(e) for e in event] type_name, field_name = self.current_batch_event[0].type_name, self.current_batch_event[0].field_name resolver = self._batch_resolver_registry.find_resolver(type_name, field_name) async_resolver = self._async_batch_resolver_registry.find_resolver(type_name, field_name) if resolver and async_resolver: warnings.warn( f"Both synchronous and asynchronous resolvers found for the same event and field." f"The synchronous resolver takes precedence. Executing: {resolver['func'].__name__}", stacklevel=2, category=PowertoolsUserWarning, ) if resolver: logger.debug(f"Found sync resolver. {resolver=}, {field_name=}") return self._call_sync_batch_resolver( resolver=resolver["func"], raise_on_error=resolver["raise_on_error"], aggregate=resolver["aggregate"], ) if async_resolver: logger.debug(f"Found async resolver. {resolver=}, {field_name=}") return asyncio.run( self._call_async_batch_resolver( resolver=async_resolver["func"], raise_on_error=async_resolver["raise_on_error"], aggregate=async_resolver["aggregate"], ), ) raise ResolverNotFoundError(f"No resolver found for '{type_name}.{field_name}'") def include_router(self, router: "Router") -> None: """Adds all resolvers defined in a router Parameters ---------- router : Router A router containing a dict of field resolvers """ # Merge app and router context logger.debug("Merging router and app context") self.context.update(**router.context) # use pointer to allow context clearance after event is processed e.g., resolve(evt, ctx) router.context = self.context logger.debug("Merging router resolver registries") self._resolver_registry.merge(router._resolver_registry) self._batch_resolver_registry.merge(router._batch_resolver_registry) self._async_batch_resolver_registry.merge(router._async_batch_resolver_registry) def resolver(self, type_name: str = "*", field_name: Optional[str] = None) -> Callable: """Registers direct resolver function for GraphQL type and field name. Parameters ---------- type_name : str, optional GraphQL type e.g., Query, Mutation, by default "*" meaning any field_name : Optional[str], optional GraphQL field e.g., getTodo, createTodo, by default None Returns ------- Callable Registered resolver Example ------- ```python from aws_lambda_powertools.event_handler import AppSyncResolver from typing import TypedDict app = AppSyncResolver() class Todo(TypedDict, total=False): id: str userId: str title: str completed: bool # resolve any GraphQL `getTodo` queries # arguments are injected as function arguments as-is @app.resolver(type_name="Query", field_name="getTodo") def get_todo(id: str = "", status: str = "open") -> Todo: todos: Response = requests.get(f"https://jsonplaceholder.typicode.com/todos/{id}") todos.raise_for_status() return todos.json() def lambda_handler(event, context): return app.resolve(event, context) ``` """ return self._resolver_registry.register(field_name=field_name, type_name=type_name) def batch_resolver( self, type_name: str = "*", field_name: Optional[str] = None, raise_on_error: bool = False, aggregate: bool = True, ) -> Callable: """Registers batch resolver function for GraphQL type and field name. By default, we handle errors gracefully by returning `None`. If you want to short-circuit and fail the entire batch use `raise_on_error=True`. Parameters ---------- type_name : str, optional GraphQL type e.g., Query, Mutation, by default "*" meaning any field_name : Optional[str], optional GraphQL field e.g., getTodo, createTodo, by default None raise_on_error : bool, optional Whether to fail entire batch upon error, or handle errors gracefully (None), by default False aggregate: bool A flag indicating whether the batch items should be processed at once or individually. If True (default), the batch resolver will process all items in the batch as a single event. If False, the batch resolver will process each item in the batch individually. Returns ------- Callable Registered resolver """ return self._batch_resolver_registry.register( field_name=field_name, type_name=type_name, raise_on_error=raise_on_error, aggregate=aggregate, ) def async_batch_resolver( self, type_name: str = "*", field_name: Optional[str] = None, raise_on_error: bool = False, aggregate: bool = True, ) -> Callable: return self._async_batch_resolver_registry.register( field_name=field_name, type_name=type_name, raise_on_error=raise_on_error, aggregate=aggregate, )
Ancestors
- Router
- BaseRouter
- abc.ABC
Methods
def batch_resolver(self, type_name: str = '*', field_name: Optional[str] = None, raise_on_error: bool = False, aggregate: bool = True) ‑> Callable
-
Registers batch resolver function for GraphQL type and field name.
By default, we handle errors gracefully by returning
None
. If you want to short-circuit and fail the entire batch useraise_on_error=True
.Parameters
type_name
:str
, optional- GraphQL type e.g., Query, Mutation, by default "*" meaning any
field_name
:Optional[str]
, optional- GraphQL field e.g., getTodo, createTodo, by default None
raise_on_error
:bool
, optional- Whether to fail entire batch upon error, or handle errors gracefully (None), by default False
aggregate
:bool
- A flag indicating whether the batch items should be processed at once or individually. If True (default), the batch resolver will process all items in the batch as a single event. If False, the batch resolver will process each item in the batch individually.
Returns
Callable
- Registered resolver
def include_router(self, router: Router) ‑> None
-
Adds all resolvers defined in a router
Parameters
router
:Router
- A router containing a dict of field resolvers
def resolve(self, event: Union[dict, List[Dict]], context: LambdaContext, data_model: Type[AppSyncResolverEvent] = aws_lambda_powertools.utilities.data_classes.appsync_resolver_event.AppSyncResolverEvent) ‑> Any
-
Resolves the response based on the provide event and decorator routes
Parameters
event
:dict | List[Dict]
- Lambda event either coming from batch processing endpoint or from standard processing endpoint
context
:LambdaContext
- Lambda context
data_model: Your data data_model to decode AppSync event, by default AppSyncResolverEvent
Example
from aws_lambda_powertools.event_handler import AppSyncResolver from aws_lambda_powertools.utilities.typing import LambdaContext @app.resolver(field_name="createSomething") def create_something(id: str): # noqa AA03 VNE003 return id def handler(event, context: LambdaContext): return app.resolve(event, context)
Bringing custom models
from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.logging import correlation_paths from aws_lambda_powertools.event_handler import AppSyncResolver tracer = Tracer(service="sample_resolver") logger = Logger(service="sample_resolver") app = AppSyncResolver() class MyCustomModel(AppSyncResolverEvent): @property def country_viewer(self) -> str: return self.request_headers.get("cloudfront-viewer-country") @app.resolver(field_name="listLocations") @app.resolver(field_name="locations") def get_locations(name: str, description: str = ""): if app.current_event.country_viewer == "US": ... return name + description @logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER) @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context, data_model=MyCustomModel)
Returns
Any
- Returns the result of the resolver
Raises
ValueError
- If we could not find a field resolver
def resolver(self, type_name: str = '*', field_name: Optional[str] = None) ‑> Callable
-
Registers direct resolver function for GraphQL type and field name.
Parameters
type_name
:str
, optional- GraphQL type e.g., Query, Mutation, by default "*" meaning any
field_name
:Optional[str]
, optional- GraphQL field e.g., getTodo, createTodo, by default None
Returns
Callable
- Registered resolver
Example
from aws_lambda_powertools.event_handler import AppSyncResolver from typing import TypedDict app = AppSyncResolver() class Todo(TypedDict, total=False): id: str userId: str title: str completed: bool # resolve any GraphQL `getTodo` queries # arguments are injected as function arguments as-is @app.resolver(type_name="Query", field_name="getTodo") def get_todo(id: str = "", status: str = "open") -> Todo: todos: Response = requests.get(f"https://jsonplaceholder.typicode.com/todos/{id}") todos.raise_for_status() return todos.json() def lambda_handler(event, context): return app.resolve(event, context)
Inherited members