Module aws_lambda_powertools.event_handler
Event handler decorators for common Lambda events
Sub-modules
aws_lambda_powertools.event_handler.api_gateway
aws_lambda_powertools.event_handler.appsync
aws_lambda_powertools.event_handler.bedrock_agent
aws_lambda_powertools.event_handler.content_types
aws_lambda_powertools.event_handler.exceptions
aws_lambda_powertools.event_handler.graphql_appsync
aws_lambda_powertools.event_handler.lambda_function_url
aws_lambda_powertools.event_handler.middlewares
aws_lambda_powertools.event_handler.openapi
aws_lambda_powertools.event_handler.router
aws_lambda_powertools.event_handler.types
aws_lambda_powertools.event_handler.util
aws_lambda_powertools.event_handler.vpc_lattice
Classes
class ALBResolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)
-
API Gateway and ALB proxy resolver
Examples
Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator
from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import APIGatewayRestResolver tracer = Tracer() app = APIGatewayRestResolver() @app.get("/get-call") def simple_get(): return {"message": "Foo"} @app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data["value"]} @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)
Amazon Application Load Balancer (ALB) resolver
Expand source code
class ALBResolver(ApiGatewayResolver): current_event: ALBEvent def __init__( self, cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False, ): """Amazon Application Load Balancer (ALB) resolver""" super().__init__(ProxyEventType.ALBEvent, cors, debug, serializer, strip_prefixes, enable_validation) def _get_base_path(self) -> str: # ALB doesn't have a stage variable, so we just return an empty string return "" @override def _to_response(self, result: dict | tuple | Response) -> Response: """Convert the route's result to a Response ALB requires a non-null body otherwise it converts as HTTP 5xx 3 main result types are supported: - Dict[str, Any]: Rest api response with just the Dict to json stringify and content-type is set to application/json - Tuple[dict, int]: Same dict handling as above but with the option of including a status code - Response: returned as is, and allows for more flexibility """ # NOTE: Minor override for early return on Response with null body for ALB if isinstance(result, Response) and result.body is None: logger.debug("ALB doesn't allow None responses; converting to empty string") result.body = "" return super()._to_response(result)
Ancestors
- ApiGatewayResolver
- BaseRouter
- abc.ABC
Class variables
var current_event : ALBEvent
Inherited members
class APIGatewayHttpResolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)
-
API Gateway and ALB proxy resolver
Examples
Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator
from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import APIGatewayRestResolver tracer = Tracer() app = APIGatewayRestResolver() @app.get("/get-call") def simple_get(): return {"message": "Foo"} @app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data["value"]} @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)
Amazon API Gateway HTTP API v2 payload resolver
Expand source code
class APIGatewayHttpResolver(ApiGatewayResolver): current_event: APIGatewayProxyEventV2 def __init__( self, cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False, ): """Amazon API Gateway HTTP API v2 payload resolver""" super().__init__( ProxyEventType.APIGatewayProxyEventV2, cors, debug, serializer, strip_prefixes, enable_validation, ) def _get_base_path(self) -> str: # 3 different scenarios: # # 1. SAM local: even though a stage variable is sent to the Lambda function, it's not used in the path # 2. API Gateway HTTP API: stage variable is used in the path # 3. API Gateway HTTP Custom Domain: stage variable is not used in the path # # To solve the 3 scenarios, we try to match the beginning of the path with the stage variable stage = self.current_event.request_context.stage if stage and stage != "$default" and self.current_event.request_context.http.path.startswith(f"/{stage}"): return f"/{stage}" return ""
Ancestors
- ApiGatewayResolver
- BaseRouter
- abc.ABC
Class variables
var current_event : APIGatewayProxyEventV2
Inherited members
class APIGatewayRestResolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)
-
API Gateway and ALB proxy resolver
Examples
Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator
from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import APIGatewayRestResolver tracer = Tracer() app = APIGatewayRestResolver() @app.get("/get-call") def simple_get(): return {"message": "Foo"} @app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data["value"]} @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)
Amazon API Gateway REST and HTTP API v1 payload resolver
Expand source code
class APIGatewayRestResolver(ApiGatewayResolver): current_event: APIGatewayProxyEvent def __init__( self, cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False, ): """Amazon API Gateway REST and HTTP API v1 payload resolver""" super().__init__( ProxyEventType.APIGatewayProxyEvent, cors, debug, serializer, strip_prefixes, enable_validation, ) def _get_base_path(self) -> str: # 3 different scenarios: # # 1. SAM local: even though a stage variable is sent to the Lambda function, it's not used in the path # 2. API Gateway REST API: stage variable is used in the path # 3. API Gateway REST Custom Domain: stage variable is not used in the path # # To solve the 3 scenarios, we try to match the beginning of the path with the stage variable stage = self.current_event.request_context.stage if stage and stage != "$default" and self.current_event.request_context.path.startswith(f"/{stage}"): return f"/{stage}" return "" # override route to ignore trailing "/" in routes for REST API def route( self, rule: str, method: str | list[str] | tuple[str], cors: bool | None = None, compress: bool = False, cache_control: str | None = None, summary: str | None = None, description: str | None = None, responses: dict[int, OpenAPIResponse] | None = None, response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION, tags: list[str] | None = None, operation_id: str | None = None, include_in_schema: bool = True, security: list[dict[str, list[str]]] | None = None, openapi_extensions: dict[str, Any] | None = None, deprecated: bool = False, middlewares: list[Callable[..., Any]] | None = None, ) -> Callable[[AnyCallableT], AnyCallableT]: # NOTE: see #1552 for more context. return super().route( rule.rstrip("/"), method, cors, compress, cache_control, summary, description, responses, response_description, tags, operation_id, include_in_schema, security, openapi_extensions, deprecated, middlewares, ) # Override _compile_regex to exclude trailing slashes for route resolution @staticmethod def _compile_regex(rule: str, base_regex: str = _ROUTE_REGEX): return super(APIGatewayRestResolver, APIGatewayRestResolver)._compile_regex(rule, "^{}/*$")
Ancestors
- ApiGatewayResolver
- BaseRouter
- abc.ABC
Class variables
var current_event : APIGatewayProxyEvent
Inherited members
class ApiGatewayResolver (proxy_type: Enum = ProxyEventType.APIGatewayProxyEvent, cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)
-
API Gateway and ALB proxy resolver
Examples
Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator
from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import APIGatewayRestResolver tracer = Tracer() app = APIGatewayRestResolver() @app.get("/get-call") def simple_get(): return {"message": "Foo"} @app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data["value"]} @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)
Parameters
proxy_type
:ProxyEventType
- Proxy request type, defaults to API Gateway V1
cors
:CORSConfig
- Optionally configure and enabled CORS. Not each route will need to have to cors=True
debug
:bool | None
- Enables debug mode, by default False. Can be also be enabled by "POWERTOOLS_DEV" environment variable
serializer
:Callable
, optional- function to serialize
obj
to a JSON formattedstr
, by default json.dumps strip_prefixes
:list[str | Pattern]
, optional- optional list of prefixes to be removed from the request path before doing the routing. This is often used with api gateways with multiple custom mappings. Each prefix can be a static string or a compiled regex pattern
enable_validation
:bool | None
- Enables validation of the request body against the route schema, by default False.
Expand source code
class ApiGatewayResolver(BaseRouter): """API Gateway and ALB proxy resolver Examples -------- Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator ```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import APIGatewayRestResolver tracer = Tracer() app = APIGatewayRestResolver() @app.get("/get-call") def simple_get(): return {"message": "Foo"} @app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data["value"]} @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context) ``` """ def __init__( self, proxy_type: Enum = ProxyEventType.APIGatewayProxyEvent, cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False, ): """ Parameters ---------- proxy_type: ProxyEventType Proxy request type, defaults to API Gateway V1 cors: CORSConfig Optionally configure and enabled CORS. Not each route will need to have to cors=True debug: bool | None Enables debug mode, by default False. Can be also be enabled by "POWERTOOLS_DEV" environment variable serializer: Callable, optional function to serialize `obj` to a JSON formatted `str`, by default json.dumps strip_prefixes: list[str | Pattern], optional optional list of prefixes to be removed from the request path before doing the routing. This is often used with api gateways with multiple custom mappings. Each prefix can be a static string or a compiled regex pattern enable_validation: bool | None Enables validation of the request body against the route schema, by default False. """ self._proxy_type = proxy_type self._dynamic_routes: list[Route] = [] self._static_routes: list[Route] = [] self._route_keys: list[str] = [] self._exception_handlers: dict[type, Callable] = {} self._cors = cors self._cors_enabled: bool = cors is not None self._cors_methods: set[str] = {"OPTIONS"} self._debug = self._has_debug(debug) self._enable_validation = enable_validation self._strip_prefixes = strip_prefixes self.context: dict = {} # early init as customers might add context before event resolution self.processed_stack_frames = [] self._response_builder_class = ResponseBuilder[BaseProxyEvent] # Allow for a custom serializer or a concise json serialization self._serializer = serializer or partial(json.dumps, separators=(",", ":"), cls=Encoder) if self._enable_validation: from aws_lambda_powertools.event_handler.middlewares.openapi_validation import OpenAPIValidationMiddleware # Note the serializer argument: only use custom serializer if provided by the caller # Otherwise, fully rely on the internal Pydantic based mechanism to serialize responses for validation. self.use([OpenAPIValidationMiddleware(validation_serializer=serializer)]) def get_openapi_schema( self, *, title: str = "Powertools API", version: str = DEFAULT_API_VERSION, openapi_version: str = DEFAULT_OPENAPI_VERSION, summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None, openapi_extensions: dict[str, Any] | None = None, ) -> OpenAPI: """ Returns the OpenAPI schema as a pydantic model. Parameters ---------- title: str The title of the application. version: str The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API openapi_version: str, default = "3.0.0" The version of the OpenAPI Specification (which the document uses). summary: str, optional A short summary of what the application does. description: str, optional A verbose explanation of the application behavior. tags: list[Tag | str], optional A list of tags used by the specification with additional metadata. servers: list[Server], optional An array of Server Objects, which provide connectivity information to a target server. terms_of_service: str, optional A URL to the Terms of Service for the API. MUST be in the format of a URL. contact: Contact, optional The contact information for the exposed API. license_info: License, optional The license information for the exposed API. security_schemes: dict[str, SecurityScheme]], optional A declaration of the security schemes available to be used in the specification. security: list[dict[str, list[str]]], optional A declaration of which security mechanisms are applied globally across the API. openapi_extensions: Dict[str, Any], optional Additional OpenAPI extensions as a dictionary. Returns ------- OpenAPI: pydantic model The OpenAPI schema as a pydantic model. """ from aws_lambda_powertools.event_handler.openapi.compat import ( GenerateJsonSchema, get_compat_model_name_map, get_definitions, ) from aws_lambda_powertools.event_handler.openapi.models import OpenAPI, PathItem, Tag from aws_lambda_powertools.event_handler.openapi.types import ( COMPONENT_REF_TEMPLATE, ) openapi_version = self._determine_openapi_version(openapi_version) # Start with the bare minimum required for a valid OpenAPI schema info: dict[str, Any] = {"title": title, "version": version} optional_fields = { "summary": summary, "description": description, "termsOfService": terms_of_service, "contact": contact, "license": license_info, } info.update({field: value for field, value in optional_fields.items() if value}) if not isinstance(openapi_extensions, dict): openapi_extensions = {} output: dict[str, Any] = { "openapi": openapi_version, "info": info, "servers": self._get_openapi_servers(servers), "security": self._get_openapi_security(security, security_schemes), **openapi_extensions, } components: dict[str, dict[str, Any]] = {} paths: dict[str, dict[str, Any]] = {} operation_ids: set[str] = set() all_routes = self._dynamic_routes + self._static_routes all_fields = self._get_fields_from_routes(all_routes) model_name_map = get_compat_model_name_map(all_fields) # Collect all models and definitions schema_generator = GenerateJsonSchema(ref_template=COMPONENT_REF_TEMPLATE) field_mapping, definitions = get_definitions( fields=all_fields, schema_generator=schema_generator, model_name_map=model_name_map, ) # Add routes to the OpenAPI schema for route in all_routes: if route.security and not _validate_openapi_security_parameters( security=route.security, security_schemes=security_schemes, ): raise SchemaValidationError( "Security configuration was not found in security_schemas or security_schema was not defined. " "See: https://docs.powertools.aws.dev/lambda/python/latest/core/event_handler/api_gateway/#security-schemes", ) if not route.include_in_schema: continue result = route._get_openapi_path( dependant=route.dependant, operation_ids=operation_ids, model_name_map=model_name_map, field_mapping=field_mapping, ) if result: path, path_definitions = result if path: paths.setdefault(route.openapi_path, {}).update(path) if path_definitions: definitions.update(path_definitions) if definitions: components["schemas"] = {k: definitions[k] for k in sorted(definitions)} if security_schemes: components["securitySchemes"] = security_schemes if components: output["components"] = components if tags: output["tags"] = [Tag(name=tag) if isinstance(tag, str) else tag for tag in tags] output["paths"] = {k: PathItem(**v) for k, v in paths.items()} return OpenAPI(**output) @staticmethod def _get_openapi_servers(servers: list[Server] | None) -> list[Server]: from aws_lambda_powertools.event_handler.openapi.models import Server # If the 'servers' property is not provided or is an empty array, # the default behavior is to return a Server Object with a URL value of "/". return servers if servers else [Server(url="/")] @staticmethod def _get_openapi_security( security: list[dict[str, list[str]]] | None, security_schemes: dict[str, SecurityScheme] | None, ) -> list[dict[str, list[str]]] | None: if not security: return None if not _validate_openapi_security_parameters(security=security, security_schemes=security_schemes): raise SchemaValidationError( "Security configuration was not found in security_schemas or security_schema was not defined. " "See: https://docs.powertools.aws.dev/lambda/python/latest/core/event_handler/api_gateway/#security-schemes", ) return security @staticmethod def _determine_openapi_version(openapi_version: str): # Pydantic V2 has no support for OpenAPI schema 3.0 if not openapi_version.startswith("3.1"): warnings.warn( "You are using Pydantic v2, which is incompatible with OpenAPI schema 3.0. Forcing OpenAPI 3.1", stacklevel=2, ) openapi_version = "3.1.0" return openapi_version def get_openapi_json_schema( self, *, title: str = "Powertools API", version: str = DEFAULT_API_VERSION, openapi_version: str = DEFAULT_OPENAPI_VERSION, summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None, openapi_extensions: dict[str, Any] | None = None, ) -> str: """ Returns the OpenAPI schema as a JSON serializable dict Parameters ---------- title: str The title of the application. version: str The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API openapi_version: str, default = "3.0.0" The version of the OpenAPI Specification (which the document uses). summary: str, optional A short summary of what the application does. description: str, optional A verbose explanation of the application behavior. tags: list[Tag, str], optional A list of tags used by the specification with additional metadata. servers: list[Server], optional An array of Server Objects, which provide connectivity information to a target server. terms_of_service: str, optional A URL to the Terms of Service for the API. MUST be in the format of a URL. contact: Contact, optional The contact information for the exposed API. license_info: License, optional The license information for the exposed API. security_schemes: dict[str, SecurityScheme]], optional A declaration of the security schemes available to be used in the specification. security: list[dict[str, list[str]]], optional A declaration of which security mechanisms are applied globally across the API. openapi_extensions: Dict[str, Any], optional Additional OpenAPI extensions as a dictionary. Returns ------- str The OpenAPI schema as a JSON serializable dict. """ from aws_lambda_powertools.event_handler.openapi.compat import model_json return model_json( self.get_openapi_schema( title=title, version=version, openapi_version=openapi_version, summary=summary, description=description, tags=tags, servers=servers, terms_of_service=terms_of_service, contact=contact, license_info=license_info, security_schemes=security_schemes, security=security, openapi_extensions=openapi_extensions, ), by_alias=True, exclude_none=True, indent=2, ) def enable_swagger( self, *, path: str = "/swagger", title: str = "Powertools for AWS Lambda (Python) API", version: str = DEFAULT_API_VERSION, openapi_version: str = DEFAULT_OPENAPI_VERSION, summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, swagger_base_url: str | None = None, middlewares: list[Callable[..., Response]] | None = None, compress: bool = False, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None, oauth2_config: OAuth2Config | None = None, persist_authorization: bool = False, openapi_extensions: dict[str, Any] | None = None, ): """ Returns the OpenAPI schema as a JSON serializable dict Parameters ---------- path: str, default = "/swagger" The path to the swagger UI. title: str The title of the application. version: str The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API openapi_version: str, default = "3.0.0" The version of the OpenAPI Specification (which the document uses). summary: str, optional A short summary of what the application does. description: str, optional A verbose explanation of the application behavior. tags: list[Tag, str], optional A list of tags used by the specification with additional metadata. servers: list[Server], optional An array of Server Objects, which provide connectivity information to a target server. terms_of_service: str, optional A URL to the Terms of Service for the API. MUST be in the format of a URL. contact: Contact, optional The contact information for the exposed API. license_info: License, optional The license information for the exposed API. swagger_base_url: str, optional The base url for the swagger UI. If not provided, we will serve a recent version of the Swagger UI. middlewares: list[Callable[..., Response]], optional List of middlewares to be used for the swagger route. compress: bool, default = False Whether or not to enable gzip compression swagger route. security_schemes: dict[str, "SecurityScheme"], optional A declaration of the security schemes available to be used in the specification. security: list[dict[str, list[str]]], optional A declaration of which security mechanisms are applied globally across the API. oauth2_config: OAuth2Config, optional The OAuth2 configuration for the Swagger UI. persist_authorization: bool, optional Whether to persist authorization data on browser close/refresh. openapi_extensions: dict[str, Any], optional Additional OpenAPI extensions as a dictionary. """ from aws_lambda_powertools.event_handler.openapi.compat import model_json from aws_lambda_powertools.event_handler.openapi.models import Server from aws_lambda_powertools.event_handler.openapi.swagger_ui import ( generate_oauth2_redirect_html, generate_swagger_html, ) @self.get(path, middlewares=middlewares, include_in_schema=False, compress=compress) def swagger_handler(): query_params = self.current_event.query_string_parameters or {} # Check for query parameters; if "format" is specified as "oauth2-redirect", # send the oauth2-redirect HTML stanza so OAuth2 can be used # Source: https://github.com/swagger-api/swagger-ui/blob/master/dist/oauth2-redirect.html if query_params.get("format") == "oauth2-redirect": return Response( status_code=200, content_type="text/html", body=generate_oauth2_redirect_html(), ) base_path = self._get_base_path() if swagger_base_url: swagger_js = f"{swagger_base_url}/swagger-ui-bundle.min.js" swagger_css = f"{swagger_base_url}/swagger-ui.min.css" else: # We now inject CSS and JS into the SwaggerUI file swagger_js = Path.open( Path(__file__).parent / "openapi" / "swagger_ui" / "swagger-ui-bundle.min.js", ).read() swagger_css = Path.open(Path(__file__).parent / "openapi" / "swagger_ui" / "swagger-ui.min.css").read() openapi_servers = servers or [Server(url=(base_path or "/"))] spec = self.get_openapi_schema( title=title, version=version, openapi_version=openapi_version, summary=summary, description=description, tags=tags, servers=openapi_servers, terms_of_service=terms_of_service, contact=contact, license_info=license_info, security_schemes=security_schemes, security=security, openapi_extensions=openapi_extensions, ) # The .replace('</', '<\\/') part is necessary to prevent a potential issue where the JSON string contains # </script> or similar tags. Escaping the forward slash in </ as <\/ ensures that the JSON does not # inadvertently close the script tag, and the JSON remains a valid string within the JavaScript code. escaped_spec = model_json( spec, by_alias=True, exclude_none=True, indent=2, ).replace("</", "<\\/") # Check for query parameters; if "format" is specified as "json", # respond with the JSON used in the OpenAPI spec # Example: https://www.example.com/swagger?format=json if query_params.get("format") == "json": return Response( status_code=200, content_type="application/json", body=escaped_spec, ) body = generate_swagger_html( escaped_spec, swagger_js, swagger_css, swagger_base_url, oauth2_config, persist_authorization, ) return Response( status_code=200, content_type="text/html", body=body, ) def route( self, rule: str, method: str | list[str] | tuple[str], cors: bool | None = None, compress: bool = False, cache_control: str | None = None, summary: str | None = None, description: str | None = None, responses: dict[int, OpenAPIResponse] | None = None, response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION, tags: list[str] | None = None, operation_id: str | None = None, include_in_schema: bool = True, security: list[dict[str, list[str]]] | None = None, openapi_extensions: dict[str, Any] | None = None, deprecated: bool = False, middlewares: list[Callable[..., Any]] | None = None, ) -> Callable[[AnyCallableT], AnyCallableT]: """Route decorator includes parameter `method`""" def register_resolver(func: AnyCallableT) -> AnyCallableT: methods = (method,) if isinstance(method, str) else method logger.debug(f"Adding route using rule {rule} and methods: {','.join(m.upper() for m in methods)}") cors_enabled = self._cors_enabled if cors is None else cors for item in methods: _route = Route( item, rule, self._compile_regex(rule), func, cors_enabled, compress, cache_control, summary, description, responses, response_description, tags, operation_id, include_in_schema, security, openapi_extensions, deprecated, middlewares, ) # The more specific route wins. # We store dynamic (/studies/{studyid}) and static routes (/studies/fetch) separately. # Then attempt a match for static routes before dynamic routes. # This ensures that the most specific route is prioritized and processed first (studies/fetch). if _route.rule.groups > 0: self._dynamic_routes.append(_route) else: self._static_routes.append(_route) self._create_route_key(item, rule) if cors_enabled: logger.debug(f"Registering method {item.upper()} to Allow Methods in CORS") self._cors_methods.add(item.upper()) return func return register_resolver def resolve(self, event: dict[str, Any], context: LambdaContext) -> dict[str, Any]: """Resolves the response based on the provide event and decorator routes ## Internals Request processing chain is triggered by a Route object being called _(`_call_route` -> `__call__`)_: 1. **When a route is matched** 1.1. Exception handlers _(if any exception bubbled up and caught)_ 1.2. Global middlewares _(before, and after on the way back)_ 1.3. Path level middleware _(before, and after on the way back)_ 1.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 1.5. Run actual route 2. **When a route is NOT matched** 2.1. Exception handlers _(if any exception bubbled up and caught)_ 2.2. Global middlewares _(before, and after on the way back)_ 2.3. Path level middleware _(before, and after on the way back)_ 2.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 2.5. Run 404 route handler 3. **When a route is a pre-flight CORS (often not matched)** 3.1. Exception handlers _(if any exception bubbled up and caught)_ 3.2. Global middlewares _(before, and after on the way back)_ 3.3. Path level middleware _(before, and after on the way back)_ 3.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 3.5. Return 204 with appropriate CORS headers 4. **When a route is matched with Data Validation enabled** 4.1. Exception handlers _(if any exception bubbled up and caught)_ 4.2. Data Validation middleware _(before, and after on the way back)_ 4.3. Global middlewares _(before, and after on the way back)_ 4.4. Path level middleware _(before, and after on the way back)_ 4.5. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 4.6. Run actual route Parameters ---------- event: dict[str, Any] Event context: LambdaContext Lambda context Returns ------- dict Returns the dict response """ if isinstance(event, BaseProxyEvent): warnings.warn( "You don't need to serialize event to Event Source Data Class when using Event Handler; " "see issue #1152", stacklevel=2, ) event = event.raw_event if self._debug: print(self._serializer(event)) # Populate router(s) dependencies without keeping a reference to each registered router BaseRouter.current_event = self._to_proxy_event(event) BaseRouter.lambda_context = context response = self._resolve().build(self.current_event, self._cors) # Debug print Processed Middlewares if self._debug: print("\nProcessed Middlewares:") print("======================") print("\n".join(self.processed_stack_frames)) print("======================") self.clear_context() return response def __call__(self, event, context) -> Any: return self.resolve(event, context) def _create_route_key(self, item: str, rule: str): route_key = item + rule if route_key in self._route_keys: warnings.warn( f"A route like this was already registered. method: '{item}' rule: '{rule}'", stacklevel=2, ) self._route_keys.append(route_key) def _get_base_path(self) -> str: raise NotImplementedError() @staticmethod def _has_debug(debug: bool | None = None) -> bool: # It might have been explicitly switched off (debug=False) if debug is not None: return debug return powertools_dev_is_set() @staticmethod def _compile_regex(rule: str, base_regex: str = _ROUTE_REGEX): """Precompile regex pattern Logic ----- 1. Find any dynamic routes defined as <pattern> e.g. @app.get("/accounts/<account_id>") 2. Create a new regex by substituting every dynamic route found as a named group (?P<group>), and match whole words only (word boundary) instead of a greedy match non-greedy example with word boundary rule: '/accounts/<account_id>' regex: r'/accounts/(?P<account_id>\\w+\\b)' value: /accounts/123/some_other_path account_id: 123 greedy example without word boundary regex: r'/accounts/(?P<account_id>.+)' value: /accounts/123/some_other_path account_id: 123/some_other_path 3. Compiles a regex and include start (^) and end ($) in between for an exact match NOTE: See #520 for context """ rule_regex: str = re.sub(_DYNAMIC_ROUTE_PATTERN, _NAMED_GROUP_BOUNDARY_PATTERN, rule) return re.compile(base_regex.format(rule_regex)) def _to_proxy_event(self, event: dict) -> BaseProxyEvent: # noqa: PLR0911 # ignore many returns """Convert the event dict to the corresponding data class""" if self._proxy_type == ProxyEventType.APIGatewayProxyEvent: logger.debug("Converting event to API Gateway REST API contract") return APIGatewayProxyEvent(event) if self._proxy_type == ProxyEventType.APIGatewayProxyEventV2: logger.debug("Converting event to API Gateway HTTP API contract") return APIGatewayProxyEventV2(event) if self._proxy_type == ProxyEventType.BedrockAgentEvent: logger.debug("Converting event to Bedrock Agent contract") return BedrockAgentEvent(event) if self._proxy_type == ProxyEventType.LambdaFunctionUrlEvent: logger.debug("Converting event to Lambda Function URL contract") return LambdaFunctionUrlEvent(event) if self._proxy_type == ProxyEventType.VPCLatticeEvent: logger.debug("Converting event to VPC Lattice contract") return VPCLatticeEvent(event) if self._proxy_type == ProxyEventType.VPCLatticeEventV2: logger.debug("Converting event to VPC LatticeV2 contract") return VPCLatticeEventV2(event) logger.debug("Converting event to ALB contract") return ALBEvent(event) def _resolve(self) -> ResponseBuilder: """Resolves the response or return the not found response""" method = self.current_event.http_method.upper() path = self._remove_prefix(self.current_event.path) registered_routes = self._static_routes + self._dynamic_routes for route in registered_routes: if method != route.method: continue match_results: Match | None = route.rule.match(path) if match_results: logger.debug("Found a registered route. Calling function") # Add matched Route reference into the Resolver context self.append_context(_route=route, _path=path) route_keys = self._convert_matches_into_route_keys(match_results) return self._call_route(route, route_keys) # pass fn args return self._handle_not_found(method=method, path=path) def _remove_prefix(self, path: str) -> str: """Remove the configured prefix from the path""" if not isinstance(self._strip_prefixes, list): return path for prefix in self._strip_prefixes: if isinstance(prefix, str): if path == prefix: return "/" if self._path_starts_with(path, prefix): return path[len(prefix) :] if isinstance(prefix, Pattern): path = re.sub(prefix, "", path) # When using regexes, we might get into a point where everything is removed # from the string, so we check if it's empty and return /, since there's nothing # else to strip anymore. if not path: return "/" return path def _convert_matches_into_route_keys(self, match: Match) -> dict[str, str]: """Converts the regex match into a dict of route keys""" return match.groupdict() @staticmethod def _path_starts_with(path: str, prefix: str): """Returns true if the `path` starts with a prefix plus a `/`""" if not isinstance(prefix, str) or prefix == "": return False return path.startswith(prefix + "/") def _handle_not_found(self, method: str, path: str) -> ResponseBuilder: """Called when no matching route was found and includes support for the cors preflight response""" logger.debug(f"No match found for path {path} and method {method}") def not_found_handler(): """Route handler for 404s It handles in the following order: 1. Pre-flight CORS requests (OPTIONS) 2. Detects and calls custom HTTP 404 handler 3. Returns standard 404 along with CORS headers Returns ------- Response HTTP 404 response """ _headers: dict[str, Any] = {} # Pre-flight request? Return immediately to avoid browser error if self._cors and method == "OPTIONS": logger.debug("Pre-flight request detected. Returning CORS with empty response") _headers["Access-Control-Allow-Methods"] = CORSConfig.build_allow_methods(self._cors_methods) return Response(status_code=204, content_type=None, headers=_headers, body="") # Customer registered 404 route? Call it. custom_not_found_handler = self._lookup_exception_handler(NotFoundError) if custom_not_found_handler: return custom_not_found_handler(NotFoundError()) # No CORS and no custom 404 fn? Default response return Response( status_code=HTTPStatus.NOT_FOUND.value, content_type=content_types.APPLICATION_JSON, headers=_headers, body={"statusCode": HTTPStatus.NOT_FOUND.value, "message": "Not found"}, ) # We create a route to trigger entire request chain (middleware+exception handlers) route = Route( rule=self._compile_regex(r".*"), method=method, path=path, func=not_found_handler, cors=self._cors_enabled, compress=False, ) # Add matched Route reference into the Resolver context self.append_context(_route=route, _path=path) # Kick-off request chain: # -> exception_handlers() # --> middlewares() # ---> not_found_route() return self._call_route(route=route, route_arguments={}) def _call_route(self, route: Route, route_arguments: dict[str, str]) -> ResponseBuilder: """Actually call the matching route with any provided keyword arguments.""" try: # Reset Processed stack for Middleware (for debugging purposes) self._reset_processed_stack() return self._response_builder_class( response=self._to_response( route(router_middlewares=self._router_middlewares, app=self, route_arguments=route_arguments), ), serializer=self._serializer, route=route, ) except Exception as exc: # If exception is handled then return the response builder to reduce noise response_builder = self._call_exception_handler(exc, route) if response_builder: return response_builder logger.exception(exc) if self._debug: # If the user has turned on debug mode, # we'll let the original exception propagate, so # they get more information about what went wrong. return self._response_builder_class( response=Response( status_code=500, content_type=content_types.TEXT_PLAIN, body="".join(traceback.format_exc()), ), serializer=self._serializer, route=route, ) raise def not_found(self, func: Callable | None = None): if func is None: return self.exception_handler(NotFoundError) return self.exception_handler(NotFoundError)(func) def exception_handler(self, exc_class: type[Exception] | list[type[Exception]]): def register_exception_handler(func: Callable): if isinstance(exc_class, list): # pragma: no cover for exp in exc_class: self._exception_handlers[exp] = func else: self._exception_handlers[exc_class] = func return func return register_exception_handler def _lookup_exception_handler(self, exp_type: type) -> Callable | None: # Use "Method Resolution Order" to allow for matching against a base class # of an exception for cls in exp_type.__mro__: if cls in self._exception_handlers: return self._exception_handlers[cls] return None def _call_exception_handler(self, exp: Exception, route: Route) -> ResponseBuilder | None: handler = self._lookup_exception_handler(type(exp)) if handler: try: return self._response_builder_class(response=handler(exp), serializer=self._serializer, route=route) except ServiceError as service_error: exp = service_error if isinstance(exp, RequestValidationError): # For security reasons, we hide msg details (don't leak Python, Pydantic or file names) errors = [{"loc": e["loc"], "type": e["type"]} for e in exp.errors()] return self._response_builder_class( response=Response( status_code=HTTPStatus.UNPROCESSABLE_ENTITY, content_type=content_types.APPLICATION_JSON, body={"statusCode": HTTPStatus.UNPROCESSABLE_ENTITY, "detail": errors}, ), serializer=self._serializer, route=route, ) if isinstance(exp, ServiceError): return self._response_builder_class( response=Response( status_code=exp.status_code, content_type=content_types.APPLICATION_JSON, body={"statusCode": exp.status_code, "message": exp.msg}, ), serializer=self._serializer, route=route, ) return None def _to_response(self, result: dict | tuple | Response) -> Response: """Convert the route's result to a Response 3 main result types are supported: - dict[str, Any]: Rest api response with just the dict to json stringify and content-type is set to application/json - tuple[dict, int]: Same dict handling as above but with the option of including a status code - Response: returned as is, and allows for more flexibility """ status_code = HTTPStatus.OK if isinstance(result, Response): return result elif isinstance(result, tuple) and len(result) == 2: # Unpack result dict and status code from tuple result, status_code = result logger.debug("Simple response detected, serializing return before constructing final response") return Response( status_code=status_code, content_type=content_types.APPLICATION_JSON, body=result, ) def include_router(self, router: Router, prefix: str | None = None) -> None: """Adds all routes and context defined in a router Parameters ---------- router : Router The Router containing a list of routes to be registered after the existing routes prefix : str, optional An optional prefix to be added to the originally defined rule """ # Add reference to parent ApiGatewayResolver to support use cases where people subclass it to add custom logic router.api_resolver = self logger.debug("Merging App context with Router context") self.context.update(**router.context) logger.debug("Appending Router middlewares into App middlewares.") self._router_middlewares = self._router_middlewares + router._router_middlewares logger.debug("Appending Router exception_handler into App exception_handler.") self._exception_handlers.update(router._exception_handlers) # use pointer to allow context clearance after event is processed e.g., resolve(evt, ctx) router.context = self.context # Iterate through the routes defined in the router to configure and apply middlewares for each route for route, func in router._routes.items(): new_route = route if prefix: rule = route[0] rule = prefix if rule == "/" else f"{prefix}{rule}" new_route = (rule, *route[1:]) # Middlewares are stored by route separately - must grab them to include # Middleware store the route without prefix, so we must not include prefix when grabbing middlewares = router._routes_with_middleware.get(route) # Need to use "type: ignore" here since mypy does not like a named parameter after # tuple expansion since may cause duplicate named parameters in the function signature. # In this case this is not possible since the tuple expansion is from a hashable source # and the `middlewares` list is a non-hashable structure so will never be included. # Still need to ignore for mypy checks or will cause failures (false-positive) self.route(*new_route, middlewares=middlewares)(func) # type: ignore @staticmethod def _get_fields_from_routes(routes: Sequence[Route]) -> list[ModelField]: """ Returns a list of fields from the routes """ from aws_lambda_powertools.event_handler.openapi.compat import ModelField from aws_lambda_powertools.event_handler.openapi.dependant import ( get_flat_params, ) body_fields_from_routes: list[ModelField] = [] responses_from_routes: list[ModelField] = [] request_fields_from_routes: list[ModelField] = [] for route in routes: if route.body_field: if not isinstance(route.body_field, ModelField): raise AssertionError("A request body myst be a Pydantic Field") body_fields_from_routes.append(route.body_field) params = get_flat_params(route.dependant) request_fields_from_routes.extend(params) if route.dependant.return_param: responses_from_routes.append(route.dependant.return_param) if route.dependant.response_extra_models: responses_from_routes.extend(route.dependant.response_extra_models) flat_models = list(responses_from_routes + request_fields_from_routes + body_fields_from_routes) return flat_models
Ancestors
- BaseRouter
- abc.ABC
Subclasses
- ALBResolver
- APIGatewayHttpResolver
- APIGatewayRestResolver
- BedrockAgentResolver
- LambdaFunctionUrlResolver
- VPCLatticeResolver
- VPCLatticeV2Resolver
Methods
def enable_swagger(self, *, path: str = '/swagger', title: str = 'Powertools for AWS Lambda (Python) API', version: str = '1.0.0', openapi_version: str = '3.1.0', summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, swagger_base_url: str | None = None, middlewares: list[Callable[..., Response]] | None = None, compress: bool = False, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None, oauth2_config: OAuth2Config | None = None, persist_authorization: bool = False, openapi_extensions: dict[str, Any] | None = None)
-
Returns the OpenAPI schema as a JSON serializable dict
Parameters
path
:str
, default= "/swagger"
- The path to the swagger UI.
title
:str
- The title of the application.
version
:str
- The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
openapi_version
:str
, default= "3.0.0"
- The version of the OpenAPI Specification (which the document uses).
summary
:str
, optional- A short summary of what the application does.
description
:str
, optional- A verbose explanation of the application behavior.
tags
:list[Tag, str]
, optional- A list of tags used by the specification with additional metadata.
servers
:list[Server]
, optional- An array of Server Objects, which provide connectivity information to a target server.
terms_of_service
:str
, optional- A URL to the Terms of Service for the API. MUST be in the format of a URL.
contact
:Contact
, optional- The contact information for the exposed API.
license_info
:License
, optional- The license information for the exposed API.
swagger_base_url
:str
, optional- The base url for the swagger UI. If not provided, we will serve a recent version of the Swagger UI.
middlewares
:list[Callable[…, Response]]
, optional- List of middlewares to be used for the swagger route.
compress
:bool
, default= False
- Whether or not to enable gzip compression swagger route.
security_schemes
:dict[str, "SecurityScheme"]
, optional- A declaration of the security schemes available to be used in the specification.
security
:list[dict[str, list[str]]]
, optional- A declaration of which security mechanisms are applied globally across the API.
oauth2_config
:OAuth2Config
, optional- The OAuth2 configuration for the Swagger UI.
persist_authorization
:bool
, optional- Whether to persist authorization data on browser close/refresh.
openapi_extensions
:dict[str, Any]
, optional- Additional OpenAPI extensions as a dictionary.
def exception_handler(self, exc_class: type[Exception] | list[type[Exception]])
def get_openapi_json_schema(self, *, title: str = 'Powertools API', version: str = '1.0.0', openapi_version: str = '3.1.0', summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None, openapi_extensions: dict[str, Any] | None = None)
-
Returns the OpenAPI schema as a JSON serializable dict
Parameters
title
:str
- The title of the application.
version
:str
- The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
openapi_version
:str
, default= "3.0.0"
- The version of the OpenAPI Specification (which the document uses).
summary
:str
, optional- A short summary of what the application does.
description
:str
, optional- A verbose explanation of the application behavior.
tags
:list[Tag, str]
, optional- A list of tags used by the specification with additional metadata.
servers
:list[Server]
, optional- An array of Server Objects, which provide connectivity information to a target server.
terms_of_service
:str
, optional- A URL to the Terms of Service for the API. MUST be in the format of a URL.
contact
:Contact
, optional- The contact information for the exposed API.
license_info
:License
, optional- The license information for the exposed API.
security_schemes
:dict[str, SecurityScheme]]
, optional- A declaration of the security schemes available to be used in the specification.
security
:list[dict[str, list[str]]]
, optional- A declaration of which security mechanisms are applied globally across the API.
openapi_extensions
:Dict[str, Any]
, optional- Additional OpenAPI extensions as a dictionary.
Returns
str
- The OpenAPI schema as a JSON serializable dict.
def get_openapi_schema(self, *, title: str = 'Powertools API', version: str = '1.0.0', openapi_version: str = '3.1.0', summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None, openapi_extensions: dict[str, Any] | None = None)
-
Returns the OpenAPI schema as a pydantic model.
Parameters
title
:str
- The title of the application.
version
:str
- The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
openapi_version
:str
, default= "3.0.0"
- The version of the OpenAPI Specification (which the document uses).
summary
:str
, optional- A short summary of what the application does.
description
:str
, optional- A verbose explanation of the application behavior.
tags
:list[Tag | str]
, optional- A list of tags used by the specification with additional metadata.
servers
:list[Server]
, optional- An array of Server Objects, which provide connectivity information to a target server.
terms_of_service
:str
, optional- A URL to the Terms of Service for the API. MUST be in the format of a URL.
contact
:Contact
, optional- The contact information for the exposed API.
license_info
:License
, optional- The license information for the exposed API.
security_schemes
:dict[str, SecurityScheme]]
, optional- A declaration of the security schemes available to be used in the specification.
security
:list[dict[str, list[str]]]
, optional- A declaration of which security mechanisms are applied globally across the API.
openapi_extensions
:Dict[str, Any]
, optional- Additional OpenAPI extensions as a dictionary.
Returns
OpenAPI
:pydantic model
- The OpenAPI schema as a pydantic model.
def include_router(self, router: Router, prefix: str | None = None) ‑> None
-
Adds all routes and context defined in a router
Parameters
router
:Router
- The Router containing a list of routes to be registered after the existing routes
prefix
:str
, optional- An optional prefix to be added to the originally defined rule
def not_found(self, func: Callable | None = None)
def resolve(self, event: dict[str, Any], context: LambdaContext)
-
Resolves the response based on the provide event and decorator routes
Internals
Request processing chain is triggered by a Route object being called (
_call_route
->__call__
):- When a route is matched 1.1. Exception handlers (if any exception bubbled up and caught) 1.2. Global middlewares (before, and after on the way back) 1.3. Path level middleware (before, and after on the way back) 1.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 1.5. Run actual route
- When a route is NOT matched 2.1. Exception handlers (if any exception bubbled up and caught) 2.2. Global middlewares (before, and after on the way back) 2.3. Path level middleware (before, and after on the way back) 2.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 2.5. Run 404 route handler
- When a route is a pre-flight CORS (often not matched) 3.1. Exception handlers (if any exception bubbled up and caught) 3.2. Global middlewares (before, and after on the way back) 3.3. Path level middleware (before, and after on the way back) 3.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 3.5. Return 204 with appropriate CORS headers
- When a route is matched with Data Validation enabled 4.1. Exception handlers (if any exception bubbled up and caught) 4.2. Data Validation middleware (before, and after on the way back) 4.3. Global middlewares (before, and after on the way back) 4.4. Path level middleware (before, and after on the way back) 4.5. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 4.6. Run actual route
Parameters
event
:dict[str, Any]
- Event
context
:LambdaContext
- Lambda context
Returns
dict
- Returns the dict response
def route(self, rule: str, method: str | list[str] | tuple[str], cors: bool | None = None, compress: bool = False, cache_control: str | None = None, summary: str | None = None, description: str | None = None, responses: dict[int, OpenAPIResponse] | None = None, response_description: str = 'Successful Response', tags: list[str] | None = None, operation_id: str | None = None, include_in_schema: bool = True, security: list[dict[str, list[str]]] | None = None, openapi_extensions: dict[str, Any] | None = None, deprecated: bool = False, middlewares: list[Callable[..., Any]] | None = None)
-
Route decorator includes parameter
method
Inherited members
class AppSyncResolver
-
AppSync GraphQL API Resolver
Example
from aws_lambda_powertools.event_handler import AppSyncResolver app = AppSyncResolver() @app.resolver(type_name="Query", field_name="listLocations") def list_locations(page: int = 0, size: int = 10) -> list: # Your logic to fetch locations with arguments passed in return [{"id": 100, "name": "Smooth Grooves"}] @app.resolver(type_name="Merchant", field_name="extraInfo") def get_extra_info() -> dict: # Can use "app.current_event.source" to filter within the parent context account_type = app.current_event.source["accountType"] method = "BTC" if account_type == "NEW" else "USD" return {"preferredPaymentMethod": method} @app.resolver(field_name="commonField") def common_field() -> str: # Would match all fieldNames matching 'commonField' return str(uuid.uuid4())
Initialize a new instance of the AppSyncResolver.
Expand source code
class AppSyncResolver(Router): """ AppSync GraphQL API Resolver Example ------- ```python from aws_lambda_powertools.event_handler import AppSyncResolver app = AppSyncResolver() @app.resolver(type_name="Query", field_name="listLocations") def list_locations(page: int = 0, size: int = 10) -> list: # Your logic to fetch locations with arguments passed in return [{"id": 100, "name": "Smooth Grooves"}] @app.resolver(type_name="Merchant", field_name="extraInfo") def get_extra_info() -> dict: # Can use "app.current_event.source" to filter within the parent context account_type = app.current_event.source["accountType"] method = "BTC" if account_type == "NEW" else "USD" return {"preferredPaymentMethod": method} @app.resolver(field_name="commonField") def common_field() -> str: # Would match all fieldNames matching 'commonField' return str(uuid.uuid4()) ``` """ def __init__(self): """ Initialize a new instance of the AppSyncResolver. """ super().__init__() self.context = {} # early init as customers might add context before event resolution self._exception_handlers: dict[type, Callable] = {} def __call__( self, event: dict, context: LambdaContext, data_model: type[AppSyncResolverEvent] = AppSyncResolverEvent, ) -> Any: """Implicit lambda handler which internally calls `resolve`""" return self.resolve(event, context, data_model) def resolve( self, event: dict | list[dict], context: LambdaContext, data_model: type[AppSyncResolverEvent] = AppSyncResolverEvent, ) -> Any: """Resolves the response based on the provide event and decorator routes Parameters ---------- event : dict | list[Dict] Lambda event either coming from batch processing endpoint or from standard processing endpoint context : LambdaContext Lambda context data_model: Your data data_model to decode AppSync event, by default AppSyncResolverEvent Example ------- ```python from aws_lambda_powertools.event_handler import AppSyncResolver from aws_lambda_powertools.utilities.typing import LambdaContext @app.resolver(field_name="createSomething") def create_something(id: str): # noqa AA03 VNE003 return id def handler(event, context: LambdaContext): return app.resolve(event, context) ``` **Bringing custom models** ```python from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.logging import correlation_paths from aws_lambda_powertools.event_handler import AppSyncResolver tracer = Tracer(service="sample_resolver") logger = Logger(service="sample_resolver") app = AppSyncResolver() class MyCustomModel(AppSyncResolverEvent): @property def country_viewer(self) -> str: return self.request_headers.get("cloudfront-viewer-country", "") @app.resolver(field_name="listLocations") @app.resolver(field_name="locations") def get_locations(name: str, description: str = ""): if app.current_event.country_viewer == "US": ... return name + description @logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER) @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context, data_model=MyCustomModel) ``` Returns ------- Any Returns the result of the resolver Raises ------- ValueError If we could not find a field resolver """ self.lambda_context = context Router.lambda_context = context try: if isinstance(event, list): Router.current_batch_event = [data_model(e) for e in event] response = self._call_batch_resolver(event=event, data_model=data_model) else: Router.current_event = data_model(event) response = self._call_single_resolver(event=event, data_model=data_model) except Exception as exp: response_builder = self._lookup_exception_handler(type(exp)) if response_builder: return response_builder(exp) raise # We don't clear the context for coroutines because we don't have control over the event loop. # If we clean the context immediately, it might not be available when the coroutine is actually executed. # For single async operations, the context should be cleaned up manually after the coroutine completes. # See: https://github.com/aws-powertools/powertools-lambda-python/issues/5290 # REVIEW: Review this support in Powertools V4 if not asyncio.iscoroutine(response): self.clear_context() return response def _call_single_resolver(self, event: dict, data_model: type[AppSyncResolverEvent]) -> Any: """Call single event resolver Parameters ---------- event : dict Event data_model : type[AppSyncResolverEvent] Data_model to decode AppSync event, by default it is of AppSyncResolverEvent type or subclass of it """ logger.debug("Processing direct resolver event") self.current_event = data_model(event) resolver = self._resolver_registry.find_resolver(self.current_event.type_name, self.current_event.field_name) if not resolver: raise ValueError(f"No resolver found for '{self.current_event.type_name}.{self.current_event.field_name}'") return resolver["func"](**self.current_event.arguments) def _call_sync_batch_resolver( self, resolver: Callable, raise_on_error: bool = False, aggregate: bool = True, ) -> list[Any]: """ Calls a synchronous batch resolver function for each event in the current batch. Parameters ---------- resolver: Callable The callable function to resolve events. raise_on_error: bool A flag indicating whether to raise an error when processing batches with failed items. Defaults to False, which means errors are handled without raising exceptions. aggregate: bool A flag indicating whether the batch items should be processed at once or individually. If True (default), the batch resolver will process all items in the batch as a single event. If False, the batch resolver will process each item in the batch individually. Returns ------- list[Any] A list of results corresponding to the resolved events. """ logger.debug(f"Graceful error handling flag {raise_on_error=}") # Checks whether the entire batch should be processed at once if aggregate: # Process the entire batch response = resolver(event=self.current_batch_event) if not isinstance(response, list): raise InvalidBatchResponse("The response must be a List when using batch resolvers") return response # Non aggregated events, so we call this event list x times # Stop on first exception we encounter if raise_on_error: return [ resolver(event=appconfig_event, **appconfig_event.arguments) for appconfig_event in self.current_batch_event ] # By default, we gracefully append `None` for any records that failed processing results = [] for idx, event in enumerate(self.current_batch_event): try: results.append(resolver(event=event, **event.arguments)) except Exception: logger.debug(f"Failed to process event number {idx} from field '{event.info.field_name}'") results.append(None) return results async def _call_async_batch_resolver( self, resolver: Callable, raise_on_error: bool = False, aggregate: bool = True, ) -> list[Any]: """ Asynchronously call a batch resolver for each event in the current batch. Parameters ---------- resolver: Callable The asynchronous resolver function. raise_on_error: bool A flag indicating whether to raise an error when processing batches with failed items. Defaults to False, which means errors are handled without raising exceptions. aggregate: bool A flag indicating whether the batch items should be processed at once or individually. If True (default), the batch resolver will process all items in the batch as a single event. If False, the batch resolver will process each item in the batch individually. Returns ------- list[Any] A list of results corresponding to the resolved events. """ logger.debug(f"Graceful error handling flag {raise_on_error=}") # Checks whether the entire batch should be processed at once if aggregate: # Process the entire batch ret = await resolver(event=self.current_batch_event) if not isinstance(ret, list): raise InvalidBatchResponse("The response must be a List when using batch resolvers") return ret response: list = [] # Prime coroutines tasks = [resolver(event=e, **e.arguments) for e in self.current_batch_event] # Aggregate results or raise at first error if raise_on_error: response.extend(await asyncio.gather(*tasks)) return response # Aggregate results and exceptions, then filter them out # Use `None` upon exception for graceful error handling at GraphQL engine level # # NOTE: asyncio.gather(return_exceptions=True) catches and includes exceptions in the results # this will become useful when we support exception handling in AppSync resolver results = await asyncio.gather(*tasks, return_exceptions=True) response.extend(None if isinstance(ret, Exception) else ret for ret in results) return response def _call_batch_resolver(self, event: list[dict], data_model: type[AppSyncResolverEvent]) -> list[Any]: """Call batch event resolver for sync and async methods Parameters ---------- event : list[dict] Batch event data_model : type[AppSyncResolverEvent] Data_model to decode AppSync event, by default AppSyncResolverEvent or a subclass Returns ------- list[Any] Results of the resolver execution. Raises ------ InconsistentPayloadError: When all events in the batch do not have the same fieldName. ResolverNotFoundError: When no resolver is found for the specified type and field. """ logger.debug("Processing batch resolver event") self.current_batch_event = [data_model(e) for e in event] type_name, field_name = self.current_batch_event[0].type_name, self.current_batch_event[0].field_name resolver = self._batch_resolver_registry.find_resolver(type_name, field_name) async_resolver = self._async_batch_resolver_registry.find_resolver(type_name, field_name) if resolver and async_resolver: warnings.warn( f"Both synchronous and asynchronous resolvers found for the same event and field." f"The synchronous resolver takes precedence. Executing: {resolver['func'].__name__}", stacklevel=2, category=PowertoolsUserWarning, ) if resolver: logger.debug(f"Found sync resolver. {resolver=}, {field_name=}") return self._call_sync_batch_resolver( resolver=resolver["func"], raise_on_error=resolver["raise_on_error"], aggregate=resolver["aggregate"], ) if async_resolver: logger.debug(f"Found async resolver. {resolver=}, {field_name=}") return asyncio.run( self._call_async_batch_resolver( resolver=async_resolver["func"], raise_on_error=async_resolver["raise_on_error"], aggregate=async_resolver["aggregate"], ), ) raise ResolverNotFoundError(f"No resolver found for '{type_name}.{field_name}'") def include_router(self, router: Router) -> None: """Adds all resolvers defined in a router Parameters ---------- router : Router A router containing a dict of field resolvers """ # Merge app and router context logger.debug("Merging router and app context") self.context.update(**router.context) # use pointer to allow context clearance after event is processed e.g., resolve(evt, ctx) router.context = self.context logger.debug("Merging router resolver registries") self._resolver_registry.merge(router._resolver_registry) self._batch_resolver_registry.merge(router._batch_resolver_registry) self._async_batch_resolver_registry.merge(router._async_batch_resolver_registry) def resolver(self, type_name: str = "*", field_name: str | None = None) -> Callable: """Registers direct resolver function for GraphQL type and field name. Parameters ---------- type_name : str, optional GraphQL type e.g., Query, Mutation, by default "*" meaning any field_name : Optional[str], optional GraphQL field e.g., getTodo, createTodo, by default None Returns ------- Callable Registered resolver Example ------- ```python from aws_lambda_powertools.event_handler import AppSyncResolver from typing import TypedDict app = AppSyncResolver() class Todo(TypedDict, total=False): id: str userId: str title: str completed: bool # resolve any GraphQL `getTodo` queries # arguments are injected as function arguments as-is @app.resolver(type_name="Query", field_name="getTodo") def get_todo(id: str = "", status: str = "open") -> Todo: todos: Response = requests.get(f"https://jsonplaceholder.typicode.com/todos/{id}") todos.raise_for_status() return todos.json() def lambda_handler(event, context): return app.resolve(event, context) ``` """ return self._resolver_registry.register(field_name=field_name, type_name=type_name) def batch_resolver( self, type_name: str = "*", field_name: str | None = None, raise_on_error: bool = False, aggregate: bool = True, ) -> Callable: """Registers batch resolver function for GraphQL type and field name. By default, we handle errors gracefully by returning `None`. If you want to short-circuit and fail the entire batch use `raise_on_error=True`. Parameters ---------- type_name : str, optional GraphQL type e.g., Query, Mutation, by default "*" meaning any field_name : Optional[str], optional GraphQL field e.g., getTodo, createTodo, by default None raise_on_error : bool, optional Whether to fail entire batch upon error, or handle errors gracefully (None), by default False aggregate: bool A flag indicating whether the batch items should be processed at once or individually. If True (default), the batch resolver will process all items in the batch as a single event. If False, the batch resolver will process each item in the batch individually. Returns ------- Callable Registered resolver """ return self._batch_resolver_registry.register( field_name=field_name, type_name=type_name, raise_on_error=raise_on_error, aggregate=aggregate, ) def async_batch_resolver( self, type_name: str = "*", field_name: str | None = None, raise_on_error: bool = False, aggregate: bool = True, ) -> Callable: return self._async_batch_resolver_registry.register( field_name=field_name, type_name=type_name, raise_on_error=raise_on_error, aggregate=aggregate, ) def exception_handler(self, exc_class: type[Exception] | list[type[Exception]]): """ A decorator function that registers a handler for one or more exception types. Parameters ---------- exc_class (type[Exception] | list[type[Exception]]) A single exception type or a list of exception types. Returns ------- Callable: A decorator function that registers the exception handler. """ def register_exception_handler(func: Callable): if isinstance(exc_class, list): # pragma: no cover for exp in exc_class: self._exception_handlers[exp] = func else: self._exception_handlers[exc_class] = func return func return register_exception_handler def _lookup_exception_handler(self, exp_type: type) -> Callable | None: """ Looks up the registered exception handler for the given exception type or its base classes. Parameters ---------- exp_type (type): The exception type to look up the handler for. Returns ------- Callable | None: The registered exception handler function if found, otherwise None. """ for cls in exp_type.__mro__: if cls in self._exception_handlers: return self._exception_handlers[cls] return None
Ancestors
- Router
- BaseRouter
- abc.ABC
Methods
def batch_resolver(self, type_name: str = '*', field_name: str | None = None, raise_on_error: bool = False, aggregate: bool = True) ‑> Callable
-
Registers batch resolver function for GraphQL type and field name.
By default, we handle errors gracefully by returning
None
. If you want to short-circuit and fail the entire batch useraise_on_error=True
.Parameters
type_name
:str
, optional- GraphQL type e.g., Query, Mutation, by default "*" meaning any
field_name
:Optional[str]
, optional- GraphQL field e.g., getTodo, createTodo, by default None
raise_on_error
:bool
, optional- Whether to fail entire batch upon error, or handle errors gracefully (None), by default False
aggregate
:bool
- A flag indicating whether the batch items should be processed at once or individually. If True (default), the batch resolver will process all items in the batch as a single event. If False, the batch resolver will process each item in the batch individually.
Returns
Callable
- Registered resolver
def exception_handler(self, exc_class: type[Exception] | list[type[Exception]])
-
A decorator function that registers a handler for one or more exception types.
Parameters
exc_class (type[Exception] | list[type[Exception]]) A single exception type or a list of exception types.
Returns
Callable
A decorator function that registers the exception handler.
def include_router(self, router: Router) ‑> None
-
Adds all resolvers defined in a router
Parameters
router
:Router
- A router containing a dict of field resolvers
def resolve(self, event: dict | list[dict], context: LambdaContext, data_model: type[AppSyncResolverEvent] = aws_lambda_powertools.utilities.data_classes.appsync_resolver_event.AppSyncResolverEvent)
-
Resolves the response based on the provide event and decorator routes
Parameters
event
:dict | list[Dict]
- Lambda event either coming from batch processing endpoint or from standard processing endpoint
context
:LambdaContext
- Lambda context
data_model: Your data data_model to decode AppSync event, by default AppSyncResolverEvent
Example
from aws_lambda_powertools.event_handler import AppSyncResolver from aws_lambda_powertools.utilities.typing import LambdaContext @app.resolver(field_name="createSomething") def create_something(id: str): # noqa AA03 VNE003 return id def handler(event, context: LambdaContext): return app.resolve(event, context)
Bringing custom models
from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.logging import correlation_paths from aws_lambda_powertools.event_handler import AppSyncResolver tracer = Tracer(service="sample_resolver") logger = Logger(service="sample_resolver") app = AppSyncResolver() class MyCustomModel(AppSyncResolverEvent): @property def country_viewer(self) -> str: return self.request_headers.get("cloudfront-viewer-country", "") @app.resolver(field_name="listLocations") @app.resolver(field_name="locations") def get_locations(name: str, description: str = ""): if app.current_event.country_viewer == "US": ... return name + description @logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER) @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context, data_model=MyCustomModel)
Returns
Any
- Returns the result of the resolver
Raises
ValueError
- If we could not find a field resolver
def resolver(self, type_name: str = '*', field_name: str | None = None) ‑> Callable
-
Registers direct resolver function for GraphQL type and field name.
Parameters
type_name
:str
, optional- GraphQL type e.g., Query, Mutation, by default "*" meaning any
field_name
:Optional[str]
, optional- GraphQL field e.g., getTodo, createTodo, by default None
Returns
Callable
- Registered resolver
Example
from aws_lambda_powertools.event_handler import AppSyncResolver from typing import TypedDict app = AppSyncResolver() class Todo(TypedDict, total=False): id: str userId: str title: str completed: bool # resolve any GraphQL `getTodo` queries # arguments are injected as function arguments as-is @app.resolver(type_name="Query", field_name="getTodo") def get_todo(id: str = "", status: str = "open") -> Todo: todos: Response = requests.get(f"https://jsonplaceholder.typicode.com/todos/{id}") todos.raise_for_status() return todos.json() def lambda_handler(event, context): return app.resolve(event, context)
Inherited members
class BedrockAgentResolver (debug: bool = False, enable_validation: bool = True)
-
Bedrock Agent Resolver
See https://aws.amazon.com/bedrock/agents/ for more information.
Examples
Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator
from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import BedrockAgentResolver tracer = Tracer() app = BedrockAgentResolver() @app.get("/claims") def simple_get(): return "You have 3 claims" @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)
Parameters
proxy_type
:ProxyEventType
- Proxy request type, defaults to API Gateway V1
cors
:CORSConfig
- Optionally configure and enabled CORS. Not each route will need to have to cors=True
debug
:bool | None
- Enables debug mode, by default False. Can be also be enabled by "POWERTOOLS_DEV" environment variable
serializer
:Callable
, optional- function to serialize
obj
to a JSON formattedstr
, by default json.dumps strip_prefixes
:list[str | Pattern]
, optional- optional list of prefixes to be removed from the request path before doing the routing. This is often used with api gateways with multiple custom mappings. Each prefix can be a static string or a compiled regex pattern
enable_validation
:bool | None
- Enables validation of the request body against the route schema, by default False.
Expand source code
class BedrockAgentResolver(ApiGatewayResolver): """Bedrock Agent Resolver See https://aws.amazon.com/bedrock/agents/ for more information. Examples -------- Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator ```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import BedrockAgentResolver tracer = Tracer() app = BedrockAgentResolver() @app.get("/claims") def simple_get(): return "You have 3 claims" @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context) ``` """ current_event: BedrockAgentEvent def __init__(self, debug: bool = False, enable_validation: bool = True): super().__init__( proxy_type=ProxyEventType.BedrockAgentEvent, cors=None, debug=debug, serializer=None, strip_prefixes=None, enable_validation=enable_validation, ) self._response_builder_class = BedrockResponseBuilder # Note: we need ignore[override] because we are making the optional `description` field required. @override def get( # type: ignore[override] self, rule: str, description: str, cors: bool | None = None, compress: bool = False, cache_control: str | None = None, summary: str | None = None, responses: dict[int, OpenAPIResponse] | None = None, response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION, tags: list[str] | None = None, operation_id: str | None = None, include_in_schema: bool = True, deprecated: bool = False, middlewares: list[Callable[..., Any]] | None = None, ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: openapi_extensions = None security = None return super().get( rule, cors, compress, cache_control, summary, description, responses, response_description, tags, operation_id, include_in_schema, security, openapi_extensions, deprecated, middlewares, ) # Note: we need ignore[override] because we are making the optional `description` field required. @override def post( # type: ignore[override] self, rule: str, description: str, cors: bool | None = None, compress: bool = False, cache_control: str | None = None, summary: str | None = None, responses: dict[int, OpenAPIResponse] | None = None, response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION, tags: list[str] | None = None, operation_id: str | None = None, include_in_schema: bool = True, deprecated: bool = False, middlewares: list[Callable[..., Any]] | None = None, ): openapi_extensions = None security = None return super().post( rule, cors, compress, cache_control, summary, description, responses, response_description, tags, operation_id, include_in_schema, security, openapi_extensions, deprecated, middlewares, ) # Note: we need ignore[override] because we are making the optional `description` field required. @override def put( # type: ignore[override] self, rule: str, description: str, cors: bool | None = None, compress: bool = False, cache_control: str | None = None, summary: str | None = None, responses: dict[int, OpenAPIResponse] | None = None, response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION, tags: list[str] | None = None, operation_id: str | None = None, include_in_schema: bool = True, deprecated: bool = False, middlewares: list[Callable[..., Any]] | None = None, ): openapi_extensions = None security = None return super().put( rule, cors, compress, cache_control, summary, description, responses, response_description, tags, operation_id, include_in_schema, security, openapi_extensions, deprecated, middlewares, ) # Note: we need ignore[override] because we are making the optional `description` field required. @override def patch( # type: ignore[override] self, rule: str, description: str, cors: bool | None = None, compress: bool = False, cache_control: str | None = None, summary: str | None = None, responses: dict[int, OpenAPIResponse] | None = None, response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION, tags: list[str] | None = None, operation_id: str | None = None, include_in_schema: bool = True, deprecated: bool = False, middlewares: list[Callable] | None = None, ): openapi_extensions = None security = None return super().patch( rule, cors, compress, cache_control, summary, description, responses, response_description, tags, operation_id, include_in_schema, security, openapi_extensions, deprecated, middlewares, ) # Note: we need ignore[override] because we are making the optional `description` field required. @override def delete( # type: ignore[override] self, rule: str, description: str, cors: bool | None = None, compress: bool = False, cache_control: str | None = None, summary: str | None = None, responses: dict[int, OpenAPIResponse] | None = None, response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION, tags: list[str] | None = None, operation_id: str | None = None, include_in_schema: bool = True, deprecated: bool = False, middlewares: list[Callable[..., Any]] | None = None, ): openapi_extensions = None security = None return super().delete( rule, cors, compress, cache_control, summary, description, responses, response_description, tags, operation_id, include_in_schema, security, openapi_extensions, deprecated, middlewares, ) @override def _convert_matches_into_route_keys(self, match: Match) -> dict[str, str]: # In Bedrock Agents, all the parameters come inside the "parameters" key, not on the apiPath # So we have to search for route parameters in the parameters key parameters: dict[str, str] = {} if match.groupdict() and self.current_event.parameters: parameters = {parameter["name"]: parameter["value"] for parameter in self.current_event.parameters} return parameters @override def get_openapi_json_schema( # type: ignore[override] self, *, title: str = "Powertools API", version: str = DEFAULT_API_VERSION, openapi_version: str = DEFAULT_OPENAPI_VERSION, summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None, ) -> str: """ Returns the OpenAPI schema as a JSON serializable dict. Since Bedrock Agents only support OpenAPI 3.0.0, we convert OpenAPI 3.1.0 schemas and enforce 3.0.0 compatibility for seamless integration. Parameters ---------- title: str The title of the application. version: str The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API openapi_version: str, default = "3.0.0" The version of the OpenAPI Specification (which the document uses). summary: str, optional A short summary of what the application does. description: str, optional A verbose explanation of the application behavior. tags: list[Tag, str], optional A list of tags used by the specification with additional metadata. servers: list[Server], optional An array of Server Objects, which provide connectivity information to a target server. terms_of_service: str, optional A URL to the Terms of Service for the API. MUST be in the format of a URL. contact: Contact, optional The contact information for the exposed API. license_info: License, optional The license information for the exposed API. security_schemes: dict[str, SecurityScheme]], optional A declaration of the security schemes available to be used in the specification. security: list[dict[str, list[str]]], optional A declaration of which security mechanisms are applied globally across the API. Returns ------- str The OpenAPI schema as a JSON serializable dict. """ from aws_lambda_powertools.event_handler.openapi.compat import model_json openapi_extensions = None schema = super().get_openapi_schema( title=title, version=version, openapi_version=openapi_version, summary=summary, description=description, tags=tags, servers=servers, terms_of_service=terms_of_service, contact=contact, license_info=license_info, security_schemes=security_schemes, security=security, openapi_extensions=openapi_extensions, ) schema.openapi = "3.0.3" # Transform OpenAPI 3.1 into 3.0 def inner(yaml_dict): if isinstance(yaml_dict, dict): if "anyOf" in yaml_dict and isinstance((anyOf := yaml_dict["anyOf"]), list): for i, item in enumerate(anyOf): if isinstance(item, dict) and item.get("type") == "null": anyOf.pop(i) yaml_dict["nullable"] = True if "examples" in yaml_dict: examples = yaml_dict["examples"] del yaml_dict["examples"] if isinstance(examples, list) and len(examples): yaml_dict["example"] = examples[0] for value in yaml_dict.values(): inner(value) elif isinstance(yaml_dict, list): for item in yaml_dict: inner(item) model = json.loads( model_json( schema, by_alias=True, exclude_none=True, indent=2, ), ) inner(model) return json.dumps(model)
Ancestors
- ApiGatewayResolver
- BaseRouter
- abc.ABC
Class variables
var current_event : BedrockAgentEvent
Methods
def get_openapi_json_schema(self, *, title: str = 'Powertools API', version: str = '1.0.0', openapi_version: str = '3.1.0', summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None)
-
Returns the OpenAPI schema as a JSON serializable dict. Since Bedrock Agents only support OpenAPI 3.0.0, we convert OpenAPI 3.1.0 schemas and enforce 3.0.0 compatibility for seamless integration.
Parameters
title
:str
- The title of the application.
version
:str
- The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
openapi_version
:str
, default= "3.0.0"
- The version of the OpenAPI Specification (which the document uses).
summary
:str
, optional- A short summary of what the application does.
description
:str
, optional- A verbose explanation of the application behavior.
tags
:list[Tag, str]
, optional- A list of tags used by the specification with additional metadata.
servers
:list[Server]
, optional- An array of Server Objects, which provide connectivity information to a target server.
terms_of_service
:str
, optional- A URL to the Terms of Service for the API. MUST be in the format of a URL.
contact
:Contact
, optional- The contact information for the exposed API.
license_info
:License
, optional- The license information for the exposed API.
security_schemes
:dict[str, SecurityScheme]]
, optional- A declaration of the security schemes available to be used in the specification.
security
:list[dict[str, list[str]]]
, optional- A declaration of which security mechanisms are applied globally across the API.
Returns
str
- The OpenAPI schema as a JSON serializable dict.
Inherited members
class CORSConfig (allow_origin: str = '*', extra_origins: list[str] | None = None, allow_headers: list[str] | None = None, expose_headers: list[str] | None = None, max_age: int | None = None, allow_credentials: bool = False)
-
CORS Config
Examples
Simple CORS example using the default permissive CORS, note that this should only be used during early prototyping.
from aws_lambda_powertools.event_handler.api_gateway import ( APIGatewayRestResolver, CORSConfig ) app = APIGatewayRestResolver(cors=CORSConfig()) @app.get("/my/path") def with_cors(): return {"message": "Foo"}
Using a custom CORSConfig where
with_cors
used the custom provided CORSConfig andwithout_cors
do not include any CORS headers.from aws_lambda_powertools.event_handler.api_gateway import ( APIGatewayRestResolver, CORSConfig ) cors_config = CORSConfig( allow_origin="https://wwww.example.com/", extra_origins=["https://dev.example.com/"], expose_headers=["x-exposed-response-header"], allow_headers=["x-custom-request-header"], max_age=100, allow_credentials=True, ) app = APIGatewayRestResolver(cors=cors_config) @app.get("/my/path") def with_cors(): return {"message": "Foo"} @app.get("/another-one", cors=False) def without_cors(): return {"message": "Foo"}
Parameters
allow_origin
:str
- The value of the
Access-Control-Allow-Origin
to send in the response. Defaults to "*", but should only be used during development. extra_origins
:list[str] | None
- The list of additional allowed origins.
allow_headers
:list[str] | None
- The list of additional allowed headers. This list is added to list of
built-in allowed headers:
Authorization
,Content-Type
,X-Amz-Date
,X-Api-Key
,X-Amz-Security-Token
. expose_headers
:list[str] | None
- A list of values to return for the Access-Control-Expose-Headers
max_age
:int | None
- The value for the
Access-Control-Max-Age
allow_credentials
:bool
- A boolean value that sets the value of
Access-Control-Allow-Credentials
Expand source code
class CORSConfig: """CORS Config Examples -------- Simple CORS example using the default permissive CORS, note that this should only be used during early prototyping. ```python from aws_lambda_powertools.event_handler.api_gateway import ( APIGatewayRestResolver, CORSConfig ) app = APIGatewayRestResolver(cors=CORSConfig()) @app.get("/my/path") def with_cors(): return {"message": "Foo"} ``` Using a custom CORSConfig where `with_cors` used the custom provided CORSConfig and `without_cors` do not include any CORS headers. ```python from aws_lambda_powertools.event_handler.api_gateway import ( APIGatewayRestResolver, CORSConfig ) cors_config = CORSConfig( allow_origin="https://wwww.example.com/", extra_origins=["https://dev.example.com/"], expose_headers=["x-exposed-response-header"], allow_headers=["x-custom-request-header"], max_age=100, allow_credentials=True, ) app = APIGatewayRestResolver(cors=cors_config) @app.get("/my/path") def with_cors(): return {"message": "Foo"} @app.get("/another-one", cors=False) def without_cors(): return {"message": "Foo"} ``` """ _REQUIRED_HEADERS = ["Authorization", "Content-Type", "X-Amz-Date", "X-Api-Key", "X-Amz-Security-Token"] def __init__( self, allow_origin: str = "*", extra_origins: list[str] | None = None, allow_headers: list[str] | None = None, expose_headers: list[str] | None = None, max_age: int | None = None, allow_credentials: bool = False, ): """ Parameters ---------- allow_origin: str The value of the `Access-Control-Allow-Origin` to send in the response. Defaults to "*", but should only be used during development. extra_origins: list[str] | None The list of additional allowed origins. allow_headers: list[str] | None The list of additional allowed headers. This list is added to list of built-in allowed headers: `Authorization`, `Content-Type`, `X-Amz-Date`, `X-Api-Key`, `X-Amz-Security-Token`. expose_headers: list[str] | None A list of values to return for the Access-Control-Expose-Headers max_age: int | None The value for the `Access-Control-Max-Age` allow_credentials: bool A boolean value that sets the value of `Access-Control-Allow-Credentials` """ self._allowed_origins = [allow_origin] if extra_origins: self._allowed_origins.extend(extra_origins) self.allow_headers = set(self._REQUIRED_HEADERS + (allow_headers or [])) self.expose_headers = expose_headers or [] self.max_age = max_age self.allow_credentials = allow_credentials def to_dict(self, origin: str | None) -> dict[str, str]: """Builds the configured Access-Control http headers""" # If there's no Origin, don't add any CORS headers if not origin: return {} # If the origin doesn't match any of the allowed origins, and we don't allow all origins ("*"), # don't add any CORS headers if origin not in self._allowed_origins and "*" not in self._allowed_origins: return {} # The origin matched an allowed origin, so return the CORS headers headers = { "Access-Control-Allow-Origin": origin, "Access-Control-Allow-Headers": CORSConfig.build_allow_methods(self.allow_headers), } if self.expose_headers: headers["Access-Control-Expose-Headers"] = ",".join(self.expose_headers) if self.max_age is not None: headers["Access-Control-Max-Age"] = str(self.max_age) if origin != "*" and self.allow_credentials is True: headers["Access-Control-Allow-Credentials"] = "true" return headers def allowed_origin(self, extracted_origin: str) -> str | None: if extracted_origin in self._allowed_origins: return extracted_origin if extracted_origin is not None and "*" in self._allowed_origins: return "*" return None @staticmethod def build_allow_methods(methods: set[str]) -> str: """Build sorted comma delimited methods for Access-Control-Allow-Methods header Parameters ---------- methods : set[str] Set of HTTP Methods Returns ------- set[str] Formatted string with all HTTP Methods allowed for CORS e.g., `GET, OPTIONS` """ return ",".join(sorted(methods))
Static methods
def build_allow_methods(methods: set[str]) ‑> str
-
Build sorted comma delimited methods for Access-Control-Allow-Methods header
Parameters
methods
:set[str]
- Set of HTTP Methods
Returns
set[str]
- Formatted string with all HTTP Methods allowed for CORS e.g.,
GET, OPTIONS
Methods
def allowed_origin(self, extracted_origin: str) ‑> str | None
def to_dict(self, origin: str | None) ‑> dict[str, str]
-
Builds the configured Access-Control http headers
class LambdaFunctionUrlResolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)
-
AWS Lambda Function URL resolver
Notes:
Lambda Function URL follows the API Gateway HTTP APIs Payload Format Version 2.0.
Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html - https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html#urls-payloads
Examples
Simple example integrating with Tracer
```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import LambdaFunctionUrlResolver
tracer = Tracer() app = LambdaFunctionUrlResolver()
@app.get("/get-call") def simple_get(): return {"message": "Foo"}
@app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data}
@tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)
Parameters
proxy_type
:ProxyEventType
- Proxy request type, defaults to API Gateway V1
cors
:CORSConfig
- Optionally configure and enabled CORS. Not each route will need to have to cors=True
debug
:bool | None
- Enables debug mode, by default False. Can be also be enabled by "POWERTOOLS_DEV" environment variable
serializer
:Callable
, optional- function to serialize
obj
to a JSON formattedstr
, by default json.dumps strip_prefixes
:list[str | Pattern]
, optional- optional list of prefixes to be removed from the request path before doing the routing. This is often used with api gateways with multiple custom mappings. Each prefix can be a static string or a compiled regex pattern
enable_validation
:bool | None
- Enables validation of the request body against the route schema, by default False.
Expand source code
class LambdaFunctionUrlResolver(ApiGatewayResolver): """AWS Lambda Function URL resolver Notes: ----- Lambda Function URL follows the API Gateway HTTP APIs Payload Format Version 2.0. Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html - https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html#urls-payloads Examples -------- Simple example integrating with Tracer ```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import LambdaFunctionUrlResolver tracer = Tracer() app = LambdaFunctionUrlResolver() @app.get("/get-call") def simple_get(): return {"message": "Foo"} @app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data} @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context) """ current_event: LambdaFunctionUrlEvent def __init__( self, cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False, ): super().__init__( ProxyEventType.LambdaFunctionUrlEvent, cors, debug, serializer, strip_prefixes, enable_validation, ) def _get_base_path(self) -> str: stage = self.current_event.request_context.stage if stage and stage != "$default" and self.current_event.request_context.http.method.startswith(f"/{stage}"): return f"/{stage}" return ""
Ancestors
- ApiGatewayResolver
- BaseRouter
- abc.ABC
Class variables
var current_event : LambdaFunctionUrlEvent
Inherited members
class Response (status_code: int, content_type: str | None = None, body: ResponseT | None = None, headers: Mapping[str, str | list[str]] | None = None, cookies: list[Cookie] | None = None, compress: bool | None = None)
-
Response data class that provides greater control over what is returned from the proxy event
Parameters
status_code
:int
- Http status code, example 200
content_type
:str
- Optionally set the Content-Type header, example "application/json". Note this will be merged into any provided http headers
body
:str | bytes | None
- Optionally set the response body. Note: bytes body will be automatically base64 encoded
headers
:Mapping[str, str | list[str]]
- Optionally set specific http headers. Setting "Content-Type" here would override the
content_type
value. cookies
:list[Cookie]
- Optionally set cookies.
Expand source code
class Response(Generic[ResponseT]): """Response data class that provides greater control over what is returned from the proxy event""" def __init__( self, status_code: int, content_type: str | None = None, body: ResponseT | None = None, headers: Mapping[str, str | list[str]] | None = None, cookies: list[Cookie] | None = None, compress: bool | None = None, ): """ Parameters ---------- status_code: int Http status code, example 200 content_type: str Optionally set the Content-Type header, example "application/json". Note this will be merged into any provided http headers body: str | bytes | None Optionally set the response body. Note: bytes body will be automatically base64 encoded headers: Mapping[str, str | list[str]] Optionally set specific http headers. Setting "Content-Type" here would override the `content_type` value. cookies: list[Cookie] Optionally set cookies. """ self.status_code = status_code self.body = body self.base64_encoded = False self.headers: dict[str, str | list[str]] = dict(headers) if headers else {} self.cookies = cookies or [] self.compress = compress self.content_type = content_type if content_type: self.headers.setdefault("Content-Type", content_type) def is_json(self) -> bool: """ Returns True if the response is JSON, based on the Content-Type. """ content_type = self.headers.get("Content-Type", "") if isinstance(content_type, list): content_type = content_type[0] return content_type.startswith("application/json")
Ancestors
- typing.Generic
Methods
def is_json(self) ‑> bool
-
Returns True if the response is JSON, based on the Content-Type.
class VPCLatticeResolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)
-
VPC Lattice resolver
Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html#vpc-lattice-receiving-events
Examples
Simple example integrating with Tracer
```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import VPCLatticeResolver
tracer = Tracer() app = VPCLatticeResolver()
@app.get("/get-call") def simple_get(): return {"message": "Foo"}
@app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data}
@tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)
Amazon VPC Lattice resolver
Expand source code
class VPCLatticeResolver(ApiGatewayResolver): """VPC Lattice resolver Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html#vpc-lattice-receiving-events Examples -------- Simple example integrating with Tracer ```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import VPCLatticeResolver tracer = Tracer() app = VPCLatticeResolver() @app.get("/get-call") def simple_get(): return {"message": "Foo"} @app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data} @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context) """ current_event: VPCLatticeEvent def __init__( self, cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False, ): """Amazon VPC Lattice resolver""" super().__init__(ProxyEventType.VPCLatticeEvent, cors, debug, serializer, strip_prefixes, enable_validation) def _get_base_path(self) -> str: return ""
Ancestors
- ApiGatewayResolver
- BaseRouter
- abc.ABC
Class variables
var current_event : VPCLatticeEvent
Inherited members
class VPCLatticeV2Resolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)
-
VPC Lattice resolver
Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html#vpc-lattice-receiving-events
Examples
Simple example integrating with Tracer
```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import VPCLatticeV2Resolver
tracer = Tracer() app = VPCLatticeV2Resolver()
@app.get("/get-call") def simple_get(): return {"message": "Foo"}
@app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data}
@tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)
Amazon VPC Lattice resolver
Expand source code
class VPCLatticeV2Resolver(ApiGatewayResolver): """VPC Lattice resolver Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html#vpc-lattice-receiving-events Examples -------- Simple example integrating with Tracer ```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import VPCLatticeV2Resolver tracer = Tracer() app = VPCLatticeV2Resolver() @app.get("/get-call") def simple_get(): return {"message": "Foo"} @app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data} @tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context) """ current_event: VPCLatticeEventV2 def __init__( self, cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False, ): """Amazon VPC Lattice resolver""" super().__init__(ProxyEventType.VPCLatticeEventV2, cors, debug, serializer, strip_prefixes, enable_validation) def _get_base_path(self) -> str: return ""
Ancestors
- ApiGatewayResolver
- BaseRouter
- abc.ABC
Class variables
var current_event : VPCLatticeEventV2
Inherited members