Module aws_lambda_powertools.event_handler

Event handler decorators for common Lambda events

Sub-modules

aws_lambda_powertools.event_handler.api_gateway
aws_lambda_powertools.event_handler.appsync
aws_lambda_powertools.event_handler.bedrock_agent
aws_lambda_powertools.event_handler.content_types
aws_lambda_powertools.event_handler.exceptions
aws_lambda_powertools.event_handler.graphql_appsync
aws_lambda_powertools.event_handler.lambda_function_url
aws_lambda_powertools.event_handler.middlewares
aws_lambda_powertools.event_handler.openapi
aws_lambda_powertools.event_handler.router
aws_lambda_powertools.event_handler.types
aws_lambda_powertools.event_handler.util
aws_lambda_powertools.event_handler.vpc_lattice

Classes

class ALBResolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)

API Gateway and ALB proxy resolver

Examples

Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.event_handler import APIGatewayRestResolver

tracer = Tracer()
app = APIGatewayRestResolver()

@app.get("/get-call")
def simple_get():
    return {"message": "Foo"}

@app.post("/post-call")
def simple_post():
    post_data: dict = app.current_event.json_body
    return {"message": post_data["value"]}

@tracer.capture_lambda_handler
def lambda_handler(event, context):
    return app.resolve(event, context)

Amazon Application Load Balancer (ALB) resolver

Expand source code
class ALBResolver(ApiGatewayResolver):
    current_event: ALBEvent

    def __init__(
        self,
        cors: CORSConfig | None = None,
        debug: bool | None = None,
        serializer: Callable[[dict], str] | None = None,
        strip_prefixes: list[str | Pattern] | None = None,
        enable_validation: bool = False,
    ):
        """Amazon Application Load Balancer (ALB) resolver"""
        super().__init__(ProxyEventType.ALBEvent, cors, debug, serializer, strip_prefixes, enable_validation)

    def _get_base_path(self) -> str:
        # ALB doesn't have a stage variable, so we just return an empty string
        return ""

    @override
    def _to_response(self, result: dict | tuple | Response) -> Response:
        """Convert the route's result to a Response

        ALB requires a non-null body otherwise it converts as HTTP 5xx

         3 main result types are supported:

        - Dict[str, Any]: Rest api response with just the Dict to json stringify and content-type is set to
          application/json
        - Tuple[dict, int]: Same dict handling as above but with the option of including a status code
        - Response: returned as is, and allows for more flexibility
        """

        # NOTE: Minor override for early return on Response with null body for ALB
        if isinstance(result, Response) and result.body is None:
            logger.debug("ALB doesn't allow None responses; converting to empty string")
            result.body = ""

        return super()._to_response(result)

Ancestors

Class variables

var current_event : ALBEvent

Inherited members

class APIGatewayHttpResolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)

API Gateway and ALB proxy resolver

Examples

Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.event_handler import APIGatewayRestResolver

tracer = Tracer()
app = APIGatewayRestResolver()

@app.get("/get-call")
def simple_get():
    return {"message": "Foo"}

@app.post("/post-call")
def simple_post():
    post_data: dict = app.current_event.json_body
    return {"message": post_data["value"]}

@tracer.capture_lambda_handler
def lambda_handler(event, context):
    return app.resolve(event, context)

Amazon API Gateway HTTP API v2 payload resolver

Expand source code
class APIGatewayHttpResolver(ApiGatewayResolver):
    current_event: APIGatewayProxyEventV2

    def __init__(
        self,
        cors: CORSConfig | None = None,
        debug: bool | None = None,
        serializer: Callable[[dict], str] | None = None,
        strip_prefixes: list[str | Pattern] | None = None,
        enable_validation: bool = False,
    ):
        """Amazon API Gateway HTTP API v2 payload resolver"""
        super().__init__(
            ProxyEventType.APIGatewayProxyEventV2,
            cors,
            debug,
            serializer,
            strip_prefixes,
            enable_validation,
        )

    def _get_base_path(self) -> str:
        # 3 different scenarios:
        #
        # 1. SAM local: even though a stage variable is sent to the Lambda function, it's not used in the path
        # 2. API Gateway HTTP API: stage variable is used in the path
        # 3. API Gateway HTTP Custom Domain: stage variable is not used in the path
        #
        # To solve the 3 scenarios, we try to match the beginning of the path with the stage variable
        stage = self.current_event.request_context.stage
        if stage and stage != "$default" and self.current_event.request_context.http.path.startswith(f"/{stage}"):
            return f"/{stage}"
        return ""

Ancestors

Class variables

var current_event : APIGatewayProxyEventV2

Inherited members

class APIGatewayRestResolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)

API Gateway and ALB proxy resolver

Examples

Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.event_handler import APIGatewayRestResolver

tracer = Tracer()
app = APIGatewayRestResolver()

@app.get("/get-call")
def simple_get():
    return {"message": "Foo"}

@app.post("/post-call")
def simple_post():
    post_data: dict = app.current_event.json_body
    return {"message": post_data["value"]}

@tracer.capture_lambda_handler
def lambda_handler(event, context):
    return app.resolve(event, context)

Amazon API Gateway REST and HTTP API v1 payload resolver

Expand source code
class APIGatewayRestResolver(ApiGatewayResolver):
    current_event: APIGatewayProxyEvent

    def __init__(
        self,
        cors: CORSConfig | None = None,
        debug: bool | None = None,
        serializer: Callable[[dict], str] | None = None,
        strip_prefixes: list[str | Pattern] | None = None,
        enable_validation: bool = False,
    ):
        """Amazon API Gateway REST and HTTP API v1 payload resolver"""
        super().__init__(
            ProxyEventType.APIGatewayProxyEvent,
            cors,
            debug,
            serializer,
            strip_prefixes,
            enable_validation,
        )

    def _get_base_path(self) -> str:
        # 3 different scenarios:
        #
        # 1. SAM local: even though a stage variable is sent to the Lambda function, it's not used in the path
        # 2. API Gateway REST API: stage variable is used in the path
        # 3. API Gateway REST Custom Domain: stage variable is not used in the path
        #
        # To solve the 3 scenarios, we try to match the beginning of the path with the stage variable
        stage = self.current_event.request_context.stage
        if stage and stage != "$default" and self.current_event.request_context.path.startswith(f"/{stage}"):
            return f"/{stage}"
        return ""

    # override route to ignore trailing "/" in routes for REST API
    def route(
        self,
        rule: str,
        method: str | list[str] | tuple[str],
        cors: bool | None = None,
        compress: bool = False,
        cache_control: str | None = None,
        summary: str | None = None,
        description: str | None = None,
        responses: dict[int, OpenAPIResponse] | None = None,
        response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION,
        tags: list[str] | None = None,
        operation_id: str | None = None,
        include_in_schema: bool = True,
        security: list[dict[str, list[str]]] | None = None,
        openapi_extensions: dict[str, Any] | None = None,
        middlewares: list[Callable[..., Any]] | None = None,
    ):
        # NOTE: see #1552 for more context.
        return super().route(
            rule.rstrip("/"),
            method,
            cors,
            compress,
            cache_control,
            summary,
            description,
            responses,
            response_description,
            tags,
            operation_id,
            include_in_schema,
            security,
            openapi_extensions,
            middlewares,
        )

    # Override _compile_regex to exclude trailing slashes for route resolution
    @staticmethod
    def _compile_regex(rule: str, base_regex: str = _ROUTE_REGEX):
        return super(APIGatewayRestResolver, APIGatewayRestResolver)._compile_regex(rule, "^{}/*$")

Ancestors

Class variables

var current_event : APIGatewayProxyEvent

Inherited members

class ApiGatewayResolver (proxy_type: Enum = ProxyEventType.APIGatewayProxyEvent, cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)

API Gateway and ALB proxy resolver

Examples

Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.event_handler import APIGatewayRestResolver

tracer = Tracer()
app = APIGatewayRestResolver()

@app.get("/get-call")
def simple_get():
    return {"message": "Foo"}

@app.post("/post-call")
def simple_post():
    post_data: dict = app.current_event.json_body
    return {"message": post_data["value"]}

@tracer.capture_lambda_handler
def lambda_handler(event, context):
    return app.resolve(event, context)

Parameters

proxy_type : ProxyEventType
Proxy request type, defaults to API Gateway V1
cors : CORSConfig
Optionally configure and enabled CORS. Not each route will need to have to cors=True
debug : bool | None
Enables debug mode, by default False. Can be also be enabled by "POWERTOOLS_DEV" environment variable
serializer : Callable, optional
function to serialize obj to a JSON formatted str, by default json.dumps
strip_prefixes : list[str | Pattern], optional
optional list of prefixes to be removed from the request path before doing the routing. This is often used with api gateways with multiple custom mappings. Each prefix can be a static string or a compiled regex pattern
enable_validation : bool | None
Enables validation of the request body against the route schema, by default False.
Expand source code
class ApiGatewayResolver(BaseRouter):
    """API Gateway and ALB proxy resolver

    Examples
    --------
    Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator

    ```python
    from aws_lambda_powertools import Tracer
    from aws_lambda_powertools.event_handler import APIGatewayRestResolver

    tracer = Tracer()
    app = APIGatewayRestResolver()

    @app.get("/get-call")
    def simple_get():
        return {"message": "Foo"}

    @app.post("/post-call")
    def simple_post():
        post_data: dict = app.current_event.json_body
        return {"message": post_data["value"]}

    @tracer.capture_lambda_handler
    def lambda_handler(event, context):
        return app.resolve(event, context)
    ```
    """

    def __init__(
        self,
        proxy_type: Enum = ProxyEventType.APIGatewayProxyEvent,
        cors: CORSConfig | None = None,
        debug: bool | None = None,
        serializer: Callable[[dict], str] | None = None,
        strip_prefixes: list[str | Pattern] | None = None,
        enable_validation: bool = False,
    ):
        """
        Parameters
        ----------
        proxy_type: ProxyEventType
            Proxy request type, defaults to API Gateway V1
        cors: CORSConfig
            Optionally configure and enabled CORS. Not each route will need to have to cors=True
        debug: bool | None
            Enables debug mode, by default False. Can be also be enabled by "POWERTOOLS_DEV"
            environment variable
        serializer: Callable, optional
            function to serialize `obj` to a JSON formatted `str`, by default json.dumps
        strip_prefixes: list[str | Pattern], optional
            optional list of prefixes to be removed from the request path before doing the routing.
            This is often used with api gateways with multiple custom mappings.
            Each prefix can be a static string or a compiled regex pattern
        enable_validation: bool | None
            Enables validation of the request body against the route schema, by default False.
        """
        self._proxy_type = proxy_type
        self._dynamic_routes: list[Route] = []
        self._static_routes: list[Route] = []
        self._route_keys: list[str] = []
        self._exception_handlers: dict[type, Callable] = {}
        self._cors = cors
        self._cors_enabled: bool = cors is not None
        self._cors_methods: set[str] = {"OPTIONS"}
        self._debug = self._has_debug(debug)
        self._enable_validation = enable_validation
        self._strip_prefixes = strip_prefixes
        self.context: dict = {}  # early init as customers might add context before event resolution
        self.processed_stack_frames = []
        self._response_builder_class = ResponseBuilder[BaseProxyEvent]

        # Allow for a custom serializer or a concise json serialization
        self._serializer = serializer or partial(json.dumps, separators=(",", ":"), cls=Encoder)

        if self._enable_validation:
            from aws_lambda_powertools.event_handler.middlewares.openapi_validation import OpenAPIValidationMiddleware

            # Note the serializer argument: only use custom serializer if provided by the caller
            # Otherwise, fully rely on the internal Pydantic based mechanism to serialize responses for validation.
            self.use([OpenAPIValidationMiddleware(validation_serializer=serializer)])

    def get_openapi_schema(
        self,
        *,
        title: str = "Powertools API",
        version: str = DEFAULT_API_VERSION,
        openapi_version: str = DEFAULT_OPENAPI_VERSION,
        summary: str | None = None,
        description: str | None = None,
        tags: list[Tag | str] | None = None,
        servers: list[Server] | None = None,
        terms_of_service: str | None = None,
        contact: Contact | None = None,
        license_info: License | None = None,
        security_schemes: dict[str, SecurityScheme] | None = None,
        security: list[dict[str, list[str]]] | None = None,
        openapi_extensions: dict[str, Any] | None = None,
    ) -> OpenAPI:
        """
        Returns the OpenAPI schema as a pydantic model.

        Parameters
        ----------
        title: str
            The title of the application.
        version: str
            The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
        openapi_version: str, default = "3.0.0"
            The version of the OpenAPI Specification (which the document uses).
        summary: str, optional
            A short summary of what the application does.
        description: str, optional
            A verbose explanation of the application behavior.
        tags: list[Tag | str], optional
            A list of tags used by the specification with additional metadata.
        servers: list[Server], optional
            An array of Server Objects, which provide connectivity information to a target server.
        terms_of_service: str, optional
            A URL to the Terms of Service for the API. MUST be in the format of a URL.
        contact: Contact, optional
            The contact information for the exposed API.
        license_info: License, optional
            The license information for the exposed API.
        security_schemes: dict[str, SecurityScheme]], optional
            A declaration of the security schemes available to be used in the specification.
        security: list[dict[str, list[str]]], optional
            A declaration of which security mechanisms are applied globally across the API.
        openapi_extensions: Dict[str, Any], optional
            Additional OpenAPI extensions as a dictionary.

        Returns
        -------
        OpenAPI: pydantic model
            The OpenAPI schema as a pydantic model.
        """

        from aws_lambda_powertools.event_handler.openapi.compat import (
            GenerateJsonSchema,
            get_compat_model_name_map,
            get_definitions,
        )
        from aws_lambda_powertools.event_handler.openapi.models import OpenAPI, PathItem, Tag
        from aws_lambda_powertools.event_handler.openapi.types import (
            COMPONENT_REF_TEMPLATE,
        )

        openapi_version = self._determine_openapi_version(openapi_version)

        # Start with the bare minimum required for a valid OpenAPI schema
        info: dict[str, Any] = {"title": title, "version": version}

        optional_fields = {
            "summary": summary,
            "description": description,
            "termsOfService": terms_of_service,
            "contact": contact,
            "license": license_info,
        }

        info.update({field: value for field, value in optional_fields.items() if value})

        if not isinstance(openapi_extensions, dict):
            openapi_extensions = {}

        output: dict[str, Any] = {
            "openapi": openapi_version,
            "info": info,
            "servers": self._get_openapi_servers(servers),
            "security": self._get_openapi_security(security, security_schemes),
            **openapi_extensions,
        }

        components: dict[str, dict[str, Any]] = {}
        paths: dict[str, dict[str, Any]] = {}
        operation_ids: set[str] = set()

        all_routes = self._dynamic_routes + self._static_routes
        all_fields = self._get_fields_from_routes(all_routes)
        model_name_map = get_compat_model_name_map(all_fields)

        # Collect all models and definitions
        schema_generator = GenerateJsonSchema(ref_template=COMPONENT_REF_TEMPLATE)
        field_mapping, definitions = get_definitions(
            fields=all_fields,
            schema_generator=schema_generator,
            model_name_map=model_name_map,
        )

        # Add routes to the OpenAPI schema
        for route in all_routes:

            if route.security and not _validate_openapi_security_parameters(
                security=route.security,
                security_schemes=security_schemes,
            ):
                raise SchemaValidationError(
                    "Security configuration was not found in security_schemas or security_schema was not defined. "
                    "See: https://docs.powertools.aws.dev/lambda/python/latest/core/event_handler/api_gateway/#security-schemes",
                )

            if not route.include_in_schema:
                continue

            result = route._get_openapi_path(
                dependant=route.dependant,
                operation_ids=operation_ids,
                model_name_map=model_name_map,
                field_mapping=field_mapping,
            )
            if result:
                path, path_definitions = result
                if path:
                    paths.setdefault(route.openapi_path, {}).update(path)
                if path_definitions:
                    definitions.update(path_definitions)

        if definitions:
            components["schemas"] = {k: definitions[k] for k in sorted(definitions)}
        if security_schemes:
            components["securitySchemes"] = security_schemes
        if components:
            output["components"] = components
        if tags:
            output["tags"] = [Tag(name=tag) if isinstance(tag, str) else tag for tag in tags]

        output["paths"] = {k: PathItem(**v) for k, v in paths.items()}

        return OpenAPI(**output)

    @staticmethod
    def _get_openapi_servers(servers: list[Server] | None) -> list[Server]:
        from aws_lambda_powertools.event_handler.openapi.models import Server

        # If the 'servers' property is not provided or is an empty array,
        # the default behavior is to return a Server Object with a URL value of "/".
        return servers if servers else [Server(url="/")]

    @staticmethod
    def _get_openapi_security(
        security: list[dict[str, list[str]]] | None,
        security_schemes: dict[str, SecurityScheme] | None,
    ) -> list[dict[str, list[str]]] | None:
        if not security:
            return None

        if not _validate_openapi_security_parameters(security=security, security_schemes=security_schemes):
            raise SchemaValidationError(
                "Security configuration was not found in security_schemas or security_schema was not defined. "
                "See: https://docs.powertools.aws.dev/lambda/python/latest/core/event_handler/api_gateway/#security-schemes",
            )

        return security

    @staticmethod
    def _determine_openapi_version(openapi_version: str):

        # Pydantic V2 has no support for OpenAPI schema 3.0
        if not openapi_version.startswith("3.1"):
            warnings.warn(
                "You are using Pydantic v2, which is incompatible with OpenAPI schema 3.0. Forcing OpenAPI 3.1",
                stacklevel=2,
            )
            openapi_version = "3.1.0"
        return openapi_version

    def get_openapi_json_schema(
        self,
        *,
        title: str = "Powertools API",
        version: str = DEFAULT_API_VERSION,
        openapi_version: str = DEFAULT_OPENAPI_VERSION,
        summary: str | None = None,
        description: str | None = None,
        tags: list[Tag | str] | None = None,
        servers: list[Server] | None = None,
        terms_of_service: str | None = None,
        contact: Contact | None = None,
        license_info: License | None = None,
        security_schemes: dict[str, SecurityScheme] | None = None,
        security: list[dict[str, list[str]]] | None = None,
        openapi_extensions: dict[str, Any] | None = None,
    ) -> str:
        """
        Returns the OpenAPI schema as a JSON serializable dict

        Parameters
        ----------
        title: str
            The title of the application.
        version: str
            The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
        openapi_version: str, default = "3.0.0"
            The version of the OpenAPI Specification (which the document uses).
        summary: str, optional
            A short summary of what the application does.
        description: str, optional
            A verbose explanation of the application behavior.
        tags: list[Tag, str], optional
            A list of tags used by the specification with additional metadata.
        servers: list[Server], optional
            An array of Server Objects, which provide connectivity information to a target server.
        terms_of_service: str, optional
            A URL to the Terms of Service for the API. MUST be in the format of a URL.
        contact: Contact, optional
            The contact information for the exposed API.
        license_info: License, optional
            The license information for the exposed API.
        security_schemes: dict[str, SecurityScheme]], optional
            A declaration of the security schemes available to be used in the specification.
        security: list[dict[str, list[str]]], optional
            A declaration of which security mechanisms are applied globally across the API.
        openapi_extensions: Dict[str, Any], optional
            Additional OpenAPI extensions as a dictionary.

        Returns
        -------
        str
            The OpenAPI schema as a JSON serializable dict.
        """
        from aws_lambda_powertools.event_handler.openapi.compat import model_json

        return model_json(
            self.get_openapi_schema(
                title=title,
                version=version,
                openapi_version=openapi_version,
                summary=summary,
                description=description,
                tags=tags,
                servers=servers,
                terms_of_service=terms_of_service,
                contact=contact,
                license_info=license_info,
                security_schemes=security_schemes,
                security=security,
                openapi_extensions=openapi_extensions,
            ),
            by_alias=True,
            exclude_none=True,
            indent=2,
        )

    def enable_swagger(
        self,
        *,
        path: str = "/swagger",
        title: str = "Powertools for AWS Lambda (Python) API",
        version: str = DEFAULT_API_VERSION,
        openapi_version: str = DEFAULT_OPENAPI_VERSION,
        summary: str | None = None,
        description: str | None = None,
        tags: list[Tag | str] | None = None,
        servers: list[Server] | None = None,
        terms_of_service: str | None = None,
        contact: Contact | None = None,
        license_info: License | None = None,
        swagger_base_url: str | None = None,
        middlewares: list[Callable[..., Response]] | None = None,
        compress: bool = False,
        security_schemes: dict[str, SecurityScheme] | None = None,
        security: list[dict[str, list[str]]] | None = None,
        oauth2_config: OAuth2Config | None = None,
        persist_authorization: bool = False,
        openapi_extensions: dict[str, Any] | None = None,
    ):
        """
        Returns the OpenAPI schema as a JSON serializable dict

        Parameters
        ----------
        path: str, default = "/swagger"
            The path to the swagger UI.
        title: str
            The title of the application.
        version: str
            The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
        openapi_version: str, default = "3.0.0"
            The version of the OpenAPI Specification (which the document uses).
        summary: str, optional
            A short summary of what the application does.
        description: str, optional
            A verbose explanation of the application behavior.
        tags: list[Tag, str], optional
            A list of tags used by the specification with additional metadata.
        servers: list[Server], optional
            An array of Server Objects, which provide connectivity information to a target server.
        terms_of_service: str, optional
            A URL to the Terms of Service for the API. MUST be in the format of a URL.
        contact: Contact, optional
            The contact information for the exposed API.
        license_info: License, optional
            The license information for the exposed API.
        swagger_base_url: str, optional
            The base url for the swagger UI. If not provided, we will serve a recent version of the Swagger UI.
        middlewares: list[Callable[..., Response]], optional
            List of middlewares to be used for the swagger route.
        compress: bool, default = False
            Whether or not to enable gzip compression swagger route.
        security_schemes: dict[str, "SecurityScheme"], optional
            A declaration of the security schemes available to be used in the specification.
        security: list[dict[str, list[str]]], optional
            A declaration of which security mechanisms are applied globally across the API.
        oauth2_config: OAuth2Config, optional
            The OAuth2 configuration for the Swagger UI.
        persist_authorization: bool, optional
            Whether to persist authorization data on browser close/refresh.
        openapi_extensions: dict[str, Any], optional
            Additional OpenAPI extensions as a dictionary.
        """
        from aws_lambda_powertools.event_handler.openapi.compat import model_json
        from aws_lambda_powertools.event_handler.openapi.models import Server
        from aws_lambda_powertools.event_handler.openapi.swagger_ui import (
            generate_oauth2_redirect_html,
            generate_swagger_html,
        )

        @self.get(path, middlewares=middlewares, include_in_schema=False, compress=compress)
        def swagger_handler():
            query_params = self.current_event.query_string_parameters or {}

            # Check for query parameters; if "format" is specified as "oauth2-redirect",
            # send the oauth2-redirect HTML stanza so OAuth2 can be used
            # Source: https://github.com/swagger-api/swagger-ui/blob/master/dist/oauth2-redirect.html
            if query_params.get("format") == "oauth2-redirect":
                return Response(
                    status_code=200,
                    content_type="text/html",
                    body=generate_oauth2_redirect_html(),
                )

            base_path = self._get_base_path()

            if swagger_base_url:
                swagger_js = f"{swagger_base_url}/swagger-ui-bundle.min.js"
                swagger_css = f"{swagger_base_url}/swagger-ui.min.css"
            else:
                # We now inject CSS and JS into the SwaggerUI file
                swagger_js = Path.open(
                    Path(__file__).parent / "openapi" / "swagger_ui" / "swagger-ui-bundle.min.js",
                ).read()
                swagger_css = Path.open(Path(__file__).parent / "openapi" / "swagger_ui" / "swagger-ui.min.css").read()

            openapi_servers = servers or [Server(url=(base_path or "/"))]

            spec = self.get_openapi_schema(
                title=title,
                version=version,
                openapi_version=openapi_version,
                summary=summary,
                description=description,
                tags=tags,
                servers=openapi_servers,
                terms_of_service=terms_of_service,
                contact=contact,
                license_info=license_info,
                security_schemes=security_schemes,
                security=security,
                openapi_extensions=openapi_extensions,
            )

            # The .replace('</', '<\\/') part is necessary to prevent a potential issue where the JSON string contains
            # </script> or similar tags. Escaping the forward slash in </ as <\/ ensures that the JSON does not
            # inadvertently close the script tag, and the JSON remains a valid string within the JavaScript code.
            escaped_spec = model_json(
                spec,
                by_alias=True,
                exclude_none=True,
                indent=2,
            ).replace("</", "<\\/")

            # Check for query parameters; if "format" is specified as "json",
            # respond with the JSON used in the OpenAPI spec
            # Example: https://www.example.com/swagger?format=json
            if query_params.get("format") == "json":
                return Response(
                    status_code=200,
                    content_type="application/json",
                    body=escaped_spec,
                )

            body = generate_swagger_html(
                escaped_spec,
                swagger_js,
                swagger_css,
                swagger_base_url,
                oauth2_config,
                persist_authorization,
            )

            return Response(
                status_code=200,
                content_type="text/html",
                body=body,
            )

    def route(
        self,
        rule: str,
        method: str | list[str] | tuple[str],
        cors: bool | None = None,
        compress: bool = False,
        cache_control: str | None = None,
        summary: str | None = None,
        description: str | None = None,
        responses: dict[int, OpenAPIResponse] | None = None,
        response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION,
        tags: list[str] | None = None,
        operation_id: str | None = None,
        include_in_schema: bool = True,
        security: list[dict[str, list[str]]] | None = None,
        openapi_extensions: dict[str, Any] | None = None,
        middlewares: list[Callable[..., Any]] | None = None,
    ):
        """Route decorator includes parameter `method`"""

        def register_resolver(func: Callable):
            methods = (method,) if isinstance(method, str) else method
            logger.debug(f"Adding route using rule {rule} and methods: {','.join(m.upper() for m in methods)}")

            cors_enabled = self._cors_enabled if cors is None else cors

            for item in methods:
                _route = Route(
                    item,
                    rule,
                    self._compile_regex(rule),
                    func,
                    cors_enabled,
                    compress,
                    cache_control,
                    summary,
                    description,
                    responses,
                    response_description,
                    tags,
                    operation_id,
                    include_in_schema,
                    security,
                    openapi_extensions,
                    middlewares,
                )

                # The more specific route wins.
                # We store dynamic (/studies/{studyid}) and static routes (/studies/fetch) separately.
                # Then attempt a match for static routes before dynamic routes.
                # This ensures that the most specific route is prioritized and processed first (studies/fetch).
                if _route.rule.groups > 0:
                    self._dynamic_routes.append(_route)
                else:
                    self._static_routes.append(_route)

                self._create_route_key(item, rule)

                if cors_enabled:
                    logger.debug(f"Registering method {item.upper()} to Allow Methods in CORS")
                    self._cors_methods.add(item.upper())

            return func

        return register_resolver

    def resolve(self, event, context) -> dict[str, Any]:
        """Resolves the response based on the provide event and decorator routes

        ## Internals

        Request processing chain is triggered by a Route object being called _(`_call_route` -> `__call__`)_:

        1. **When a route is matched**
          1.1. Exception handlers _(if any exception bubbled up and caught)_
          1.2. Global middlewares _(before, and after on the way back)_
          1.3. Path level middleware _(before, and after on the way back)_
          1.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter)
          1.5. Run actual route
        2. **When a route is NOT matched**
          2.1. Exception handlers _(if any exception bubbled up and caught)_
          2.2. Global middlewares _(before, and after on the way back)_
          2.3. Path level middleware _(before, and after on the way back)_
          2.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter)
          2.5. Run 404 route handler
        3. **When a route is a pre-flight CORS (often not matched)**
          3.1. Exception handlers _(if any exception bubbled up and caught)_
          3.2. Global middlewares _(before, and after on the way back)_
          3.3. Path level middleware _(before, and after on the way back)_
          3.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter)
          3.5. Return 204 with appropriate CORS headers
        4. **When a route is matched with Data Validation enabled**
          4.1. Exception handlers _(if any exception bubbled up and caught)_
          4.2. Data Validation middleware _(before, and after on the way back)_
          4.3. Global middlewares _(before, and after on the way back)_
          4.4. Path level middleware _(before, and after on the way back)_
          4.5. Middleware adapter to ensure Response is homogenous (_registered_api_adapter)
          4.6. Run actual route

        Parameters
        ----------
        event: dict[str, Any]
            Event
        context: LambdaContext
            Lambda context
        Returns
        -------
        dict
            Returns the dict response
        """
        if isinstance(event, BaseProxyEvent):
            warnings.warn(
                "You don't need to serialize event to Event Source Data Class when using Event Handler; "
                "see issue #1152",
                stacklevel=2,
            )
            event = event.raw_event

        if self._debug:
            print(self._serializer(event))

        # Populate router(s) dependencies without keeping a reference to each registered router
        BaseRouter.current_event = self._to_proxy_event(event)
        BaseRouter.lambda_context = context

        response = self._resolve().build(self.current_event, self._cors)

        # Debug print Processed Middlewares
        if self._debug:
            print("\nProcessed Middlewares:")
            print("======================")
            print("\n".join(self.processed_stack_frames))
            print("======================")

        self.clear_context()

        return response

    def __call__(self, event, context) -> Any:
        return self.resolve(event, context)

    def _create_route_key(self, item: str, rule: str):
        route_key = item + rule
        if route_key in self._route_keys:
            warnings.warn(
                f"A route like this was already registered. method: '{item}' rule: '{rule}'",
                stacklevel=2,
            )
        self._route_keys.append(route_key)

    def _get_base_path(self) -> str:
        raise NotImplementedError()

    @staticmethod
    def _has_debug(debug: bool | None = None) -> bool:
        # It might have been explicitly switched off (debug=False)
        if debug is not None:
            return debug

        return powertools_dev_is_set()

    @staticmethod
    def _compile_regex(rule: str, base_regex: str = _ROUTE_REGEX):
        """Precompile regex pattern

        Logic
        -----

        1. Find any dynamic routes defined as <pattern>
            e.g. @app.get("/accounts/<account_id>")
        2. Create a new regex by substituting every dynamic route found as a named group (?P<group>),
        and match whole words only (word boundary) instead of a greedy match

            non-greedy example with word boundary

                rule: '/accounts/<account_id>'
                regex: r'/accounts/(?P<account_id>\\w+\\b)'

                value: /accounts/123/some_other_path
                account_id: 123

            greedy example without word boundary

                regex: r'/accounts/(?P<account_id>.+)'

                value: /accounts/123/some_other_path
                account_id: 123/some_other_path
        3. Compiles a regex and include start (^) and end ($) in between for an exact match

        NOTE: See #520 for context
        """
        rule_regex: str = re.sub(_DYNAMIC_ROUTE_PATTERN, _NAMED_GROUP_BOUNDARY_PATTERN, rule)
        return re.compile(base_regex.format(rule_regex))

    def _to_proxy_event(self, event: dict) -> BaseProxyEvent:  # noqa: PLR0911  # ignore many returns
        """Convert the event dict to the corresponding data class"""
        if self._proxy_type == ProxyEventType.APIGatewayProxyEvent:
            logger.debug("Converting event to API Gateway REST API contract")
            return APIGatewayProxyEvent(event)
        if self._proxy_type == ProxyEventType.APIGatewayProxyEventV2:
            logger.debug("Converting event to API Gateway HTTP API contract")
            return APIGatewayProxyEventV2(event)
        if self._proxy_type == ProxyEventType.BedrockAgentEvent:
            logger.debug("Converting event to Bedrock Agent contract")
            return BedrockAgentEvent(event)
        if self._proxy_type == ProxyEventType.LambdaFunctionUrlEvent:
            logger.debug("Converting event to Lambda Function URL contract")
            return LambdaFunctionUrlEvent(event)
        if self._proxy_type == ProxyEventType.VPCLatticeEvent:
            logger.debug("Converting event to VPC Lattice contract")
            return VPCLatticeEvent(event)
        if self._proxy_type == ProxyEventType.VPCLatticeEventV2:
            logger.debug("Converting event to VPC LatticeV2 contract")
            return VPCLatticeEventV2(event)
        logger.debug("Converting event to ALB contract")
        return ALBEvent(event)

    def _resolve(self) -> ResponseBuilder:
        """Resolves the response or return the not found response"""
        method = self.current_event.http_method.upper()
        path = self._remove_prefix(self.current_event.path)

        registered_routes = self._static_routes + self._dynamic_routes

        for route in registered_routes:
            if method != route.method:
                continue
            match_results: Match | None = route.rule.match(path)
            if match_results:
                logger.debug("Found a registered route. Calling function")
                # Add matched Route reference into the Resolver context
                self.append_context(_route=route, _path=path)

                route_keys = self._convert_matches_into_route_keys(match_results)
                return self._call_route(route, route_keys)  # pass fn args

        return self._handle_not_found(method=method, path=path)

    def _remove_prefix(self, path: str) -> str:
        """Remove the configured prefix from the path"""
        if not isinstance(self._strip_prefixes, list):
            return path

        for prefix in self._strip_prefixes:
            if isinstance(prefix, str):
                if path == prefix:
                    return "/"

                if self._path_starts_with(path, prefix):
                    return path[len(prefix) :]

            if isinstance(prefix, Pattern):
                path = re.sub(prefix, "", path)

                # When using regexes, we might get into a point where everything is removed
                # from the string, so we check if it's empty and return /, since there's nothing
                # else to strip anymore.
                if not path:
                    return "/"

        return path

    def _convert_matches_into_route_keys(self, match: Match) -> dict[str, str]:
        """Converts the regex match into a dict of route keys"""
        return match.groupdict()

    @staticmethod
    def _path_starts_with(path: str, prefix: str):
        """Returns true if the `path` starts with a prefix plus a `/`"""
        if not isinstance(prefix, str) or prefix == "":
            return False

        return path.startswith(prefix + "/")

    def _handle_not_found(self, method: str, path: str) -> ResponseBuilder:
        """Called when no matching route was found and includes support for the cors preflight response"""
        logger.debug(f"No match found for path {path} and method {method}")

        def not_found_handler():
            """Route handler for 404s

            It handles in the following order:

            1. Pre-flight CORS requests (OPTIONS)
            2. Detects and calls custom HTTP 404 handler
            3. Returns standard 404 along with CORS headers

            Returns
            -------
            Response
                HTTP 404 response
            """
            _headers: dict[str, Any] = {}

            # Pre-flight request? Return immediately to avoid browser error
            if self._cors and method == "OPTIONS":
                logger.debug("Pre-flight request detected. Returning CORS with empty response")
                _headers["Access-Control-Allow-Methods"] = CORSConfig.build_allow_methods(self._cors_methods)

                return Response(status_code=204, content_type=None, headers=_headers, body="")

            # Customer registered 404 route? Call it.
            custom_not_found_handler = self._lookup_exception_handler(NotFoundError)
            if custom_not_found_handler:
                return custom_not_found_handler(NotFoundError())

            # No CORS and no custom 404 fn? Default response
            return Response(
                status_code=HTTPStatus.NOT_FOUND.value,
                content_type=content_types.APPLICATION_JSON,
                headers=_headers,
                body={"statusCode": HTTPStatus.NOT_FOUND.value, "message": "Not found"},
            )

        # We create a route to trigger entire request chain (middleware+exception handlers)
        route = Route(
            rule=self._compile_regex(r".*"),
            method=method,
            path=path,
            func=not_found_handler,
            cors=self._cors_enabled,
            compress=False,
        )

        # Add matched Route reference into the Resolver context
        self.append_context(_route=route, _path=path)

        # Kick-off request chain:
        # -> exception_handlers()
        # --> middlewares()
        # ---> not_found_route()
        return self._call_route(route=route, route_arguments={})

    def _call_route(self, route: Route, route_arguments: dict[str, str]) -> ResponseBuilder:
        """Actually call the matching route with any provided keyword arguments."""
        try:
            # Reset Processed stack for Middleware (for debugging purposes)
            self._reset_processed_stack()

            return self._response_builder_class(
                response=self._to_response(
                    route(router_middlewares=self._router_middlewares, app=self, route_arguments=route_arguments),
                ),
                serializer=self._serializer,
                route=route,
            )
        except Exception as exc:
            # If exception is handled then return the response builder to reduce noise
            response_builder = self._call_exception_handler(exc, route)
            if response_builder:
                return response_builder

            logger.exception(exc)
            if self._debug:
                # If the user has turned on debug mode,
                # we'll let the original exception propagate, so
                # they get more information about what went wrong.
                return self._response_builder_class(
                    response=Response(
                        status_code=500,
                        content_type=content_types.TEXT_PLAIN,
                        body="".join(traceback.format_exc()),
                    ),
                    serializer=self._serializer,
                    route=route,
                )

            raise

    def not_found(self, func: Callable | None = None):
        if func is None:
            return self.exception_handler(NotFoundError)
        return self.exception_handler(NotFoundError)(func)

    def exception_handler(self, exc_class: type[Exception] | list[type[Exception]]):
        def register_exception_handler(func: Callable):
            if isinstance(exc_class, list):  # pragma: no cover
                for exp in exc_class:
                    self._exception_handlers[exp] = func
            else:
                self._exception_handlers[exc_class] = func
            return func

        return register_exception_handler

    def _lookup_exception_handler(self, exp_type: type) -> Callable | None:
        # Use "Method Resolution Order" to allow for matching against a base class
        # of an exception
        for cls in exp_type.__mro__:
            if cls in self._exception_handlers:
                return self._exception_handlers[cls]
        return None

    def _call_exception_handler(self, exp: Exception, route: Route) -> ResponseBuilder | None:
        handler = self._lookup_exception_handler(type(exp))
        if handler:
            try:
                return self._response_builder_class(response=handler(exp), serializer=self._serializer, route=route)
            except ServiceError as service_error:
                exp = service_error

        if isinstance(exp, RequestValidationError):
            # For security reasons, we hide msg details (don't leak Python, Pydantic or file names)
            errors = [{"loc": e["loc"], "type": e["type"]} for e in exp.errors()]

            return self._response_builder_class(
                response=Response(
                    status_code=HTTPStatus.UNPROCESSABLE_ENTITY,
                    content_type=content_types.APPLICATION_JSON,
                    body={"statusCode": HTTPStatus.UNPROCESSABLE_ENTITY, "detail": errors},
                ),
                serializer=self._serializer,
                route=route,
            )

        if isinstance(exp, ServiceError):
            return self._response_builder_class(
                response=Response(
                    status_code=exp.status_code,
                    content_type=content_types.APPLICATION_JSON,
                    body={"statusCode": exp.status_code, "message": exp.msg},
                ),
                serializer=self._serializer,
                route=route,
            )

        return None

    def _to_response(self, result: dict | tuple | Response) -> Response:
        """Convert the route's result to a Response

         3 main result types are supported:

        - dict[str, Any]: Rest api response with just the dict to json stringify and content-type is set to
          application/json
        - tuple[dict, int]: Same dict handling as above but with the option of including a status code
        - Response: returned as is, and allows for more flexibility
        """
        status_code = HTTPStatus.OK
        if isinstance(result, Response):
            return result
        elif isinstance(result, tuple) and len(result) == 2:
            # Unpack result dict and status code from tuple
            result, status_code = result

        logger.debug("Simple response detected, serializing return before constructing final response")
        return Response(
            status_code=status_code,
            content_type=content_types.APPLICATION_JSON,
            body=result,
        )

    def include_router(self, router: Router, prefix: str | None = None) -> None:
        """Adds all routes and context defined in a router

        Parameters
        ----------
        router : Router
            The Router containing a list of routes to be registered after the existing routes
        prefix : str, optional
            An optional prefix to be added to the originally defined rule
        """

        # Add reference to parent ApiGatewayResolver to support use cases where people subclass it to add custom logic
        router.api_resolver = self

        logger.debug("Merging App context with Router context")
        self.context.update(**router.context)

        logger.debug("Appending Router middlewares into App middlewares.")
        self._router_middlewares = self._router_middlewares + router._router_middlewares

        logger.debug("Appending Router exception_handler into App exception_handler.")
        self._exception_handlers.update(router._exception_handlers)

        # use pointer to allow context clearance after event is processed e.g., resolve(evt, ctx)
        router.context = self.context

        # Iterate through the routes defined in the router to configure and apply middlewares for each route
        for route, func in router._routes.items():
            new_route = route

            if prefix:
                rule = route[0]
                rule = prefix if rule == "/" else f"{prefix}{rule}"
                new_route = (rule, *route[1:])

            # Middlewares are stored by route separately - must grab them to include
            # Middleware store the route without prefix, so we must not include prefix when grabbing
            middlewares = router._routes_with_middleware.get(route)

            # Need to use "type: ignore" here since mypy does not like a named parameter after
            # tuple expansion since may cause duplicate named parameters in the function signature.
            # In this case this is not possible since the tuple expansion is from a hashable source
            # and the `middlewares` list is a non-hashable structure so will never be included.
            # Still need to ignore for mypy checks or will cause failures (false-positive)
            self.route(*new_route, middlewares=middlewares)(func)  # type: ignore

    @staticmethod
    def _get_fields_from_routes(routes: Sequence[Route]) -> list[ModelField]:
        """
        Returns a list of fields from the routes
        """

        from aws_lambda_powertools.event_handler.openapi.compat import ModelField
        from aws_lambda_powertools.event_handler.openapi.dependant import (
            get_flat_params,
        )

        body_fields_from_routes: list[ModelField] = []
        responses_from_routes: list[ModelField] = []
        request_fields_from_routes: list[ModelField] = []

        for route in routes:
            if route.body_field:
                if not isinstance(route.body_field, ModelField):
                    raise AssertionError("A request body myst be a Pydantic Field")
                body_fields_from_routes.append(route.body_field)

            params = get_flat_params(route.dependant)
            request_fields_from_routes.extend(params)

            if route.dependant.return_param:
                responses_from_routes.append(route.dependant.return_param)

            if route.dependant.response_extra_models:
                responses_from_routes.extend(route.dependant.response_extra_models)

        flat_models = list(responses_from_routes + request_fields_from_routes + body_fields_from_routes)
        return flat_models

Ancestors

Subclasses

Methods

def enable_swagger(self, *, path: str = '/swagger', title: str = 'Powertools for AWS Lambda (Python) API', version: str = '1.0.0', openapi_version: str = '3.1.0', summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, swagger_base_url: str | None = None, middlewares: list[Callable[..., Response]] | None = None, compress: bool = False, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None, oauth2_config: OAuth2Config | None = None, persist_authorization: bool = False, openapi_extensions: dict[str, Any] | None = None)

Returns the OpenAPI schema as a JSON serializable dict

Parameters

path : str, default = "/swagger"
The path to the swagger UI.
title : str
The title of the application.
version : str
The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
openapi_version : str, default = "3.0.0"
The version of the OpenAPI Specification (which the document uses).
summary : str, optional
A short summary of what the application does.
description : str, optional
A verbose explanation of the application behavior.
tags : list[Tag, str], optional
A list of tags used by the specification with additional metadata.
servers : list[Server], optional
An array of Server Objects, which provide connectivity information to a target server.
terms_of_service : str, optional
A URL to the Terms of Service for the API. MUST be in the format of a URL.
contact : Contact, optional
The contact information for the exposed API.
license_info : License, optional
The license information for the exposed API.
swagger_base_url : str, optional
The base url for the swagger UI. If not provided, we will serve a recent version of the Swagger UI.
middlewares : list[Callable[…, Response]], optional
List of middlewares to be used for the swagger route.
compress : bool, default = False
Whether or not to enable gzip compression swagger route.
security_schemes : dict[str, "SecurityScheme"], optional
A declaration of the security schemes available to be used in the specification.
security : list[dict[str, list[str]]], optional
A declaration of which security mechanisms are applied globally across the API.
oauth2_config : OAuth2Config, optional
The OAuth2 configuration for the Swagger UI.
persist_authorization : bool, optional
Whether to persist authorization data on browser close/refresh.
openapi_extensions : dict[str, Any], optional
Additional OpenAPI extensions as a dictionary.
def exception_handler(self, exc_class: type[Exception] | list[type[Exception]])
def get_openapi_json_schema(self, *, title: str = 'Powertools API', version: str = '1.0.0', openapi_version: str = '3.1.0', summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None, openapi_extensions: dict[str, Any] | None = None)

Returns the OpenAPI schema as a JSON serializable dict

Parameters

title : str
The title of the application.
version : str
The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
openapi_version : str, default = "3.0.0"
The version of the OpenAPI Specification (which the document uses).
summary : str, optional
A short summary of what the application does.
description : str, optional
A verbose explanation of the application behavior.
tags : list[Tag, str], optional
A list of tags used by the specification with additional metadata.
servers : list[Server], optional
An array of Server Objects, which provide connectivity information to a target server.
terms_of_service : str, optional
A URL to the Terms of Service for the API. MUST be in the format of a URL.
contact : Contact, optional
The contact information for the exposed API.
license_info : License, optional
The license information for the exposed API.
security_schemes : dict[str, SecurityScheme]], optional
A declaration of the security schemes available to be used in the specification.
security : list[dict[str, list[str]]], optional
A declaration of which security mechanisms are applied globally across the API.
openapi_extensions : Dict[str, Any], optional
Additional OpenAPI extensions as a dictionary.

Returns

str
The OpenAPI schema as a JSON serializable dict.
def get_openapi_schema(self, *, title: str = 'Powertools API', version: str = '1.0.0', openapi_version: str = '3.1.0', summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None, openapi_extensions: dict[str, Any] | None = None)

Returns the OpenAPI schema as a pydantic model.

Parameters

title : str
The title of the application.
version : str
The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
openapi_version : str, default = "3.0.0"
The version of the OpenAPI Specification (which the document uses).
summary : str, optional
A short summary of what the application does.
description : str, optional
A verbose explanation of the application behavior.
tags : list[Tag | str], optional
A list of tags used by the specification with additional metadata.
servers : list[Server], optional
An array of Server Objects, which provide connectivity information to a target server.
terms_of_service : str, optional
A URL to the Terms of Service for the API. MUST be in the format of a URL.
contact : Contact, optional
The contact information for the exposed API.
license_info : License, optional
The license information for the exposed API.
security_schemes : dict[str, SecurityScheme]], optional
A declaration of the security schemes available to be used in the specification.
security : list[dict[str, list[str]]], optional
A declaration of which security mechanisms are applied globally across the API.
openapi_extensions : Dict[str, Any], optional
Additional OpenAPI extensions as a dictionary.

Returns

OpenAPI : pydantic model
The OpenAPI schema as a pydantic model.
def include_router(self, router: Router, prefix: str | None = None) ‑> None

Adds all routes and context defined in a router

Parameters

router : Router
The Router containing a list of routes to be registered after the existing routes
prefix : str, optional
An optional prefix to be added to the originally defined rule
def not_found(self, func: Callable | None = None)
def resolve(self, event, context) ‑> dict[str, typing.Any]

Resolves the response based on the provide event and decorator routes

Internals

Request processing chain is triggered by a Route object being called (_call_route -> __call__):

  1. When a route is matched 1.1. Exception handlers (if any exception bubbled up and caught) 1.2. Global middlewares (before, and after on the way back) 1.3. Path level middleware (before, and after on the way back) 1.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 1.5. Run actual route
  2. When a route is NOT matched 2.1. Exception handlers (if any exception bubbled up and caught) 2.2. Global middlewares (before, and after on the way back) 2.3. Path level middleware (before, and after on the way back) 2.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 2.5. Run 404 route handler
  3. When a route is a pre-flight CORS (often not matched) 3.1. Exception handlers (if any exception bubbled up and caught) 3.2. Global middlewares (before, and after on the way back) 3.3. Path level middleware (before, and after on the way back) 3.4. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 3.5. Return 204 with appropriate CORS headers
  4. When a route is matched with Data Validation enabled 4.1. Exception handlers (if any exception bubbled up and caught) 4.2. Data Validation middleware (before, and after on the way back) 4.3. Global middlewares (before, and after on the way back) 4.4. Path level middleware (before, and after on the way back) 4.5. Middleware adapter to ensure Response is homogenous (_registered_api_adapter) 4.6. Run actual route

Parameters

event : dict[str, Any]
Event
context : LambdaContext
Lambda context

Returns

dict
Returns the dict response
def route(self, rule: str, method: str | list[str] | tuple[str], cors: bool | None = None, compress: bool = False, cache_control: str | None = None, summary: str | None = None, description: str | None = None, responses: dict[int, OpenAPIResponse] | None = None, response_description: str = 'Successful Response', tags: list[str] | None = None, operation_id: str | None = None, include_in_schema: bool = True, security: list[dict[str, list[str]]] | None = None, openapi_extensions: dict[str, Any] | None = None, middlewares: list[Callable[..., Any]] | None = None)

Route decorator includes parameter method

Inherited members

class AppSyncResolver

AppSync GraphQL API Resolver

Example

from aws_lambda_powertools.event_handler import AppSyncResolver

app = AppSyncResolver()

@app.resolver(type_name="Query", field_name="listLocations")
def list_locations(page: int = 0, size: int = 10) -> list:
    # Your logic to fetch locations with arguments passed in
    return [{"id": 100, "name": "Smooth Grooves"}]

@app.resolver(type_name="Merchant", field_name="extraInfo")
def get_extra_info() -> dict:
    # Can use "app.current_event.source" to filter within the parent context
    account_type = app.current_event.source["accountType"]
    method = "BTC" if account_type == "NEW" else "USD"
    return {"preferredPaymentMethod": method}

@app.resolver(field_name="commonField")
def common_field() -> str:
    # Would match all fieldNames matching 'commonField'
    return str(uuid.uuid4())

Initialize a new instance of the AppSyncResolver.

Expand source code
class AppSyncResolver(Router):
    """
    AppSync GraphQL API Resolver

    Example
    -------
    ```python
    from aws_lambda_powertools.event_handler import AppSyncResolver

    app = AppSyncResolver()

    @app.resolver(type_name="Query", field_name="listLocations")
    def list_locations(page: int = 0, size: int = 10) -> list:
        # Your logic to fetch locations with arguments passed in
        return [{"id": 100, "name": "Smooth Grooves"}]

    @app.resolver(type_name="Merchant", field_name="extraInfo")
    def get_extra_info() -> dict:
        # Can use "app.current_event.source" to filter within the parent context
        account_type = app.current_event.source["accountType"]
        method = "BTC" if account_type == "NEW" else "USD"
        return {"preferredPaymentMethod": method}

    @app.resolver(field_name="commonField")
    def common_field() -> str:
        # Would match all fieldNames matching 'commonField'
        return str(uuid.uuid4())
    ```
    """

    def __init__(self):
        """
        Initialize a new instance of the AppSyncResolver.
        """
        super().__init__()
        self.context = {}  # early init as customers might add context before event resolution

    def __call__(
        self,
        event: dict,
        context: LambdaContext,
        data_model: type[AppSyncResolverEvent] = AppSyncResolverEvent,
    ) -> Any:
        """Implicit lambda handler which internally calls `resolve`"""
        return self.resolve(event, context, data_model)

    def resolve(
        self,
        event: dict | list[dict],
        context: LambdaContext,
        data_model: type[AppSyncResolverEvent] = AppSyncResolverEvent,
    ) -> Any:
        """Resolves the response based on the provide event and decorator routes

        Parameters
        ----------
        event : dict | list[Dict]
            Lambda event either coming from batch processing endpoint or from standard processing endpoint
        context : LambdaContext
            Lambda context
        data_model:
            Your data data_model to decode AppSync event, by default AppSyncResolverEvent

        Example
        -------

        ```python
        from aws_lambda_powertools.event_handler import AppSyncResolver
        from aws_lambda_powertools.utilities.typing import LambdaContext

        @app.resolver(field_name="createSomething")
        def create_something(id: str):  # noqa AA03 VNE003
            return id

        def handler(event, context: LambdaContext):
            return app.resolve(event, context)
        ```

        **Bringing custom models**

        ```python
        from aws_lambda_powertools import Logger, Tracer

        from aws_lambda_powertools.logging import correlation_paths
        from aws_lambda_powertools.event_handler import AppSyncResolver

        tracer = Tracer(service="sample_resolver")
        logger = Logger(service="sample_resolver")
        app = AppSyncResolver()


        class MyCustomModel(AppSyncResolverEvent):
            @property
            def country_viewer(self) -> str:
                return self.request_headers.get("cloudfront-viewer-country", "")


        @app.resolver(field_name="listLocations")
        @app.resolver(field_name="locations")
        def get_locations(name: str, description: str = ""):
            if app.current_event.country_viewer == "US":
                ...
            return name + description


        @logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)
        @tracer.capture_lambda_handler
        def lambda_handler(event, context):
            return app.resolve(event, context, data_model=MyCustomModel)
        ```

        Returns
        -------
        Any
            Returns the result of the resolver

        Raises
        -------
        ValueError
            If we could not find a field resolver
        """

        self.lambda_context = context
        Router.lambda_context = context

        if isinstance(event, list):
            Router.current_batch_event = [data_model(e) for e in event]
            response = self._call_batch_resolver(event=event, data_model=data_model)
        else:
            Router.current_event = data_model(event)
            response = self._call_single_resolver(event=event, data_model=data_model)

        # We don't clear the context for coroutines because we don't have control over the event loop.
        # If we clean the context immediately, it might not be available when the coroutine is actually executed.
        # For single async operations, the context should be cleaned up manually after the coroutine completes.
        # See: https://github.com/aws-powertools/powertools-lambda-python/issues/5290
        # REVIEW: Review this support in Powertools V4
        if not asyncio.iscoroutine(response):
            self.clear_context()

        return response

    def _call_single_resolver(self, event: dict, data_model: type[AppSyncResolverEvent]) -> Any:
        """Call single event resolver

        Parameters
        ----------
        event : dict
            Event
        data_model : type[AppSyncResolverEvent]
            Data_model to decode AppSync event, by default it is of AppSyncResolverEvent type or subclass of it
        """

        logger.debug("Processing direct resolver event")

        self.current_event = data_model(event)
        resolver = self._resolver_registry.find_resolver(self.current_event.type_name, self.current_event.field_name)
        if not resolver:
            raise ValueError(f"No resolver found for '{self.current_event.type_name}.{self.current_event.field_name}'")
        return resolver["func"](**self.current_event.arguments)

    def _call_sync_batch_resolver(
        self,
        resolver: Callable,
        raise_on_error: bool = False,
        aggregate: bool = True,
    ) -> list[Any]:
        """
        Calls a synchronous batch resolver function for each event in the current batch.

        Parameters
        ----------
        resolver: Callable
            The callable function to resolve events.
        raise_on_error: bool
            A flag indicating whether to raise an error when processing batches
            with failed items. Defaults to False, which means errors are handled without raising exceptions.
        aggregate: bool
            A flag indicating whether the batch items should be processed at once or individually.
            If True (default), the batch resolver will process all items in the batch as a single event.
            If False, the batch resolver will process each item in the batch individually.

        Returns
        -------
        list[Any]
            A list of results corresponding to the resolved events.
        """

        logger.debug(f"Graceful error handling flag {raise_on_error=}")

        # Checks whether the entire batch should be processed at once
        if aggregate:
            # Process the entire batch
            response = resolver(event=self.current_batch_event)

            if not isinstance(response, list):
                raise InvalidBatchResponse("The response must be a List when using batch resolvers")

            return response

        # Non aggregated events, so we call this event list x times
        # Stop on first exception we encounter
        if raise_on_error:
            return [
                resolver(event=appconfig_event, **appconfig_event.arguments)
                for appconfig_event in self.current_batch_event
            ]

        # By default, we gracefully append `None` for any records that failed processing
        results = []
        for idx, event in enumerate(self.current_batch_event):
            try:
                results.append(resolver(event=event, **event.arguments))
            except Exception:
                logger.debug(f"Failed to process event number {idx} from field '{event.info.field_name}'")
                results.append(None)

        return results

    async def _call_async_batch_resolver(
        self,
        resolver: Callable,
        raise_on_error: bool = False,
        aggregate: bool = True,
    ) -> list[Any]:
        """
        Asynchronously call a batch resolver for each event in the current batch.

        Parameters
        ----------
        resolver: Callable
            The asynchronous resolver function.
        raise_on_error: bool
            A flag indicating whether to raise an error when processing batches
            with failed items. Defaults to False, which means errors are handled without raising exceptions.
        aggregate: bool
            A flag indicating whether the batch items should be processed at once or individually.
            If True (default), the batch resolver will process all items in the batch as a single event.
            If False, the batch resolver will process each item in the batch individually.

        Returns
        -------
        list[Any]
            A list of results corresponding to the resolved events.
        """

        logger.debug(f"Graceful error handling flag {raise_on_error=}")

        # Checks whether the entire batch should be processed at once
        if aggregate:
            # Process the entire batch
            ret = await resolver(event=self.current_batch_event)
            if not isinstance(ret, list):
                raise InvalidBatchResponse("The response must be a List when using batch resolvers")

            return ret

        response: list = []

        # Prime coroutines
        tasks = [resolver(event=e, **e.arguments) for e in self.current_batch_event]

        # Aggregate results or raise at first error
        if raise_on_error:
            response.extend(await asyncio.gather(*tasks))
            return response

        # Aggregate results and exceptions, then filter them out
        # Use `None` upon exception for graceful error handling at GraphQL engine level
        #
        # NOTE: asyncio.gather(return_exceptions=True) catches and includes exceptions in the results
        #       this will become useful when we support exception handling in AppSync resolver
        results = await asyncio.gather(*tasks, return_exceptions=True)
        response.extend(None if isinstance(ret, Exception) else ret for ret in results)

        return response

    def _call_batch_resolver(self, event: list[dict], data_model: type[AppSyncResolverEvent]) -> list[Any]:
        """Call batch event resolver for sync and async methods

        Parameters
        ----------
        event : list[dict]
            Batch event
        data_model : type[AppSyncResolverEvent]
            Data_model to decode AppSync event, by default AppSyncResolverEvent or a subclass

        Returns
        -------
        list[Any]
            Results of the resolver execution.

        Raises
        ------
        InconsistentPayloadError:
            When all events in the batch do not have the same fieldName.

        ResolverNotFoundError:
            When no resolver is found for the specified type and field.
        """
        logger.debug("Processing batch resolver event")

        self.current_batch_event = [data_model(e) for e in event]
        type_name, field_name = self.current_batch_event[0].type_name, self.current_batch_event[0].field_name

        resolver = self._batch_resolver_registry.find_resolver(type_name, field_name)
        async_resolver = self._async_batch_resolver_registry.find_resolver(type_name, field_name)

        if resolver and async_resolver:
            warnings.warn(
                f"Both synchronous and asynchronous resolvers found for the same event and field."
                f"The synchronous resolver takes precedence. Executing: {resolver['func'].__name__}",
                stacklevel=2,
                category=PowertoolsUserWarning,
            )

        if resolver:
            logger.debug(f"Found sync resolver. {resolver=}, {field_name=}")
            return self._call_sync_batch_resolver(
                resolver=resolver["func"],
                raise_on_error=resolver["raise_on_error"],
                aggregate=resolver["aggregate"],
            )

        if async_resolver:
            logger.debug(f"Found async resolver. {resolver=}, {field_name=}")
            return asyncio.run(
                self._call_async_batch_resolver(
                    resolver=async_resolver["func"],
                    raise_on_error=async_resolver["raise_on_error"],
                    aggregate=async_resolver["aggregate"],
                ),
            )

        raise ResolverNotFoundError(f"No resolver found for '{type_name}.{field_name}'")

    def include_router(self, router: Router) -> None:
        """Adds all resolvers defined in a router

        Parameters
        ----------
        router : Router
            A router containing a dict of field resolvers
        """

        # Merge app and router context
        logger.debug("Merging router and app context")
        self.context.update(**router.context)

        # use pointer to allow context clearance after event is processed e.g., resolve(evt, ctx)
        router.context = self.context

        logger.debug("Merging router resolver registries")
        self._resolver_registry.merge(router._resolver_registry)
        self._batch_resolver_registry.merge(router._batch_resolver_registry)
        self._async_batch_resolver_registry.merge(router._async_batch_resolver_registry)

    def resolver(self, type_name: str = "*", field_name: str | None = None) -> Callable:
        """Registers direct resolver function for GraphQL type and field name.

        Parameters
        ----------
        type_name : str, optional
            GraphQL type e.g., Query, Mutation, by default "*" meaning any
        field_name : Optional[str], optional
            GraphQL field e.g., getTodo, createTodo, by default None

        Returns
        -------
        Callable
            Registered resolver

        Example
        -------

        ```python
        from aws_lambda_powertools.event_handler import AppSyncResolver

        from typing import TypedDict

        app = AppSyncResolver()

        class Todo(TypedDict, total=False):
            id: str
            userId: str
            title: str
            completed: bool

        # resolve any GraphQL `getTodo` queries
        # arguments are injected as function arguments as-is
        @app.resolver(type_name="Query", field_name="getTodo")
        def get_todo(id: str = "", status: str = "open") -> Todo:
            todos: Response = requests.get(f"https://jsonplaceholder.typicode.com/todos/{id}")
            todos.raise_for_status()

            return todos.json()

        def lambda_handler(event, context):
            return app.resolve(event, context)
        ```
        """
        return self._resolver_registry.register(field_name=field_name, type_name=type_name)

    def batch_resolver(
        self,
        type_name: str = "*",
        field_name: str | None = None,
        raise_on_error: bool = False,
        aggregate: bool = True,
    ) -> Callable:
        """Registers batch resolver function for GraphQL type and field name.

        By default, we handle errors gracefully by returning `None`. If you want
        to short-circuit and fail the entire batch use `raise_on_error=True`.

        Parameters
        ----------
        type_name : str, optional
            GraphQL type e.g., Query, Mutation, by default "*" meaning any
        field_name : Optional[str], optional
            GraphQL field e.g., getTodo, createTodo, by default None
        raise_on_error : bool, optional
            Whether to fail entire batch upon error, or handle errors gracefully (None), by default False
        aggregate: bool
            A flag indicating whether the batch items should be processed at once or individually.
            If True (default), the batch resolver will process all items in the batch as a single event.
            If False, the batch resolver will process each item in the batch individually.

        Returns
        -------
        Callable
            Registered resolver
        """
        return self._batch_resolver_registry.register(
            field_name=field_name,
            type_name=type_name,
            raise_on_error=raise_on_error,
            aggregate=aggregate,
        )

    def async_batch_resolver(
        self,
        type_name: str = "*",
        field_name: str | None = None,
        raise_on_error: bool = False,
        aggregate: bool = True,
    ) -> Callable:
        return self._async_batch_resolver_registry.register(
            field_name=field_name,
            type_name=type_name,
            raise_on_error=raise_on_error,
            aggregate=aggregate,
        )

Ancestors

Methods

def batch_resolver(self, type_name: str = '*', field_name: str | None = None, raise_on_error: bool = False, aggregate: bool = True) ‑> Callable

Registers batch resolver function for GraphQL type and field name.

By default, we handle errors gracefully by returning None. If you want to short-circuit and fail the entire batch use raise_on_error=True.

Parameters

type_name : str, optional
GraphQL type e.g., Query, Mutation, by default "*" meaning any
field_name : Optional[str], optional
GraphQL field e.g., getTodo, createTodo, by default None
raise_on_error : bool, optional
Whether to fail entire batch upon error, or handle errors gracefully (None), by default False
aggregate : bool
A flag indicating whether the batch items should be processed at once or individually. If True (default), the batch resolver will process all items in the batch as a single event. If False, the batch resolver will process each item in the batch individually.

Returns

Callable
Registered resolver
def include_router(self, router: Router) ‑> None

Adds all resolvers defined in a router

Parameters

router : Router
A router containing a dict of field resolvers
def resolve(self, event: dict | list[dict], context: LambdaContext, data_model: type[AppSyncResolverEvent] = aws_lambda_powertools.utilities.data_classes.appsync_resolver_event.AppSyncResolverEvent)

Resolves the response based on the provide event and decorator routes

Parameters

event : dict | list[Dict]
Lambda event either coming from batch processing endpoint or from standard processing endpoint
context : LambdaContext
Lambda context

data_model: Your data data_model to decode AppSync event, by default AppSyncResolverEvent

Example

from aws_lambda_powertools.event_handler import AppSyncResolver
from aws_lambda_powertools.utilities.typing import LambdaContext

@app.resolver(field_name="createSomething")
def create_something(id: str):  # noqa AA03 VNE003
    return id

def handler(event, context: LambdaContext):
    return app.resolve(event, context)

Bringing custom models

from aws_lambda_powertools import Logger, Tracer

from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.event_handler import AppSyncResolver

tracer = Tracer(service="sample_resolver")
logger = Logger(service="sample_resolver")
app = AppSyncResolver()


class MyCustomModel(AppSyncResolverEvent):
    @property
    def country_viewer(self) -> str:
        return self.request_headers.get("cloudfront-viewer-country", "")


@app.resolver(field_name="listLocations")
@app.resolver(field_name="locations")
def get_locations(name: str, description: str = ""):
    if app.current_event.country_viewer == "US":
        ...
    return name + description


@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)
@tracer.capture_lambda_handler
def lambda_handler(event, context):
    return app.resolve(event, context, data_model=MyCustomModel)

Returns

Any
Returns the result of the resolver

Raises

ValueError
If we could not find a field resolver
def resolver(self, type_name: str = '*', field_name: str | None = None) ‑> Callable

Registers direct resolver function for GraphQL type and field name.

Parameters

type_name : str, optional
GraphQL type e.g., Query, Mutation, by default "*" meaning any
field_name : Optional[str], optional
GraphQL field e.g., getTodo, createTodo, by default None

Returns

Callable
Registered resolver

Example

from aws_lambda_powertools.event_handler import AppSyncResolver

from typing import TypedDict

app = AppSyncResolver()

class Todo(TypedDict, total=False):
    id: str
    userId: str
    title: str
    completed: bool

# resolve any GraphQL `getTodo` queries
# arguments are injected as function arguments as-is
@app.resolver(type_name="Query", field_name="getTodo")
def get_todo(id: str = "", status: str = "open") -> Todo:
    todos: Response = requests.get(f"https://jsonplaceholder.typicode.com/todos/{id}")
    todos.raise_for_status()

    return todos.json()

def lambda_handler(event, context):
    return app.resolve(event, context)

Inherited members

class BedrockAgentResolver (debug: bool = False, enable_validation: bool = True)

Bedrock Agent Resolver

See https://aws.amazon.com/bedrock/agents/ for more information.

Examples

Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.event_handler import BedrockAgentResolver

tracer = Tracer()
app = BedrockAgentResolver()

@app.get("/claims")
def simple_get():
    return "You have 3 claims"

@tracer.capture_lambda_handler
def lambda_handler(event, context):
    return app.resolve(event, context)

Parameters

proxy_type : ProxyEventType
Proxy request type, defaults to API Gateway V1
cors : CORSConfig
Optionally configure and enabled CORS. Not each route will need to have to cors=True
debug : bool | None
Enables debug mode, by default False. Can be also be enabled by "POWERTOOLS_DEV" environment variable
serializer : Callable, optional
function to serialize obj to a JSON formatted str, by default json.dumps
strip_prefixes : list[str | Pattern], optional
optional list of prefixes to be removed from the request path before doing the routing. This is often used with api gateways with multiple custom mappings. Each prefix can be a static string or a compiled regex pattern
enable_validation : bool | None
Enables validation of the request body against the route schema, by default False.
Expand source code
class BedrockAgentResolver(ApiGatewayResolver):
    """Bedrock Agent Resolver

    See https://aws.amazon.com/bedrock/agents/ for more information.

    Examples
    --------
    Simple example with a custom lambda handler using the Tracer capture_lambda_handler decorator

    ```python
    from aws_lambda_powertools import Tracer
    from aws_lambda_powertools.event_handler import BedrockAgentResolver

    tracer = Tracer()
    app = BedrockAgentResolver()

    @app.get("/claims")
    def simple_get():
        return "You have 3 claims"

    @tracer.capture_lambda_handler
    def lambda_handler(event, context):
        return app.resolve(event, context)
    ```

    """

    current_event: BedrockAgentEvent

    def __init__(self, debug: bool = False, enable_validation: bool = True):
        super().__init__(
            proxy_type=ProxyEventType.BedrockAgentEvent,
            cors=None,
            debug=debug,
            serializer=None,
            strip_prefixes=None,
            enable_validation=enable_validation,
        )
        self._response_builder_class = BedrockResponseBuilder

    # Note: we need ignore[override] because we are making the optional `description` field required.
    @override
    def get(  # type: ignore[override]
        self,
        rule: str,
        description: str,
        cors: bool | None = None,
        compress: bool = False,
        cache_control: str | None = None,
        summary: str | None = None,
        responses: dict[int, OpenAPIResponse] | None = None,
        response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION,
        tags: list[str] | None = None,
        operation_id: str | None = None,
        include_in_schema: bool = True,
        middlewares: list[Callable[..., Any]] | None = None,
    ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:

        openapi_extensions = None
        security = None

        return super().get(
            rule,
            cors,
            compress,
            cache_control,
            summary,
            description,
            responses,
            response_description,
            tags,
            operation_id,
            include_in_schema,
            security,
            openapi_extensions,
            middlewares,
        )

    # Note: we need ignore[override] because we are making the optional `description` field required.
    @override
    def post(  # type: ignore[override]
        self,
        rule: str,
        description: str,
        cors: bool | None = None,
        compress: bool = False,
        cache_control: str | None = None,
        summary: str | None = None,
        responses: dict[int, OpenAPIResponse] | None = None,
        response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION,
        tags: list[str] | None = None,
        operation_id: str | None = None,
        include_in_schema: bool = True,
        middlewares: list[Callable[..., Any]] | None = None,
    ):
        openapi_extensions = None
        security = None

        return super().post(
            rule,
            cors,
            compress,
            cache_control,
            summary,
            description,
            responses,
            response_description,
            tags,
            operation_id,
            include_in_schema,
            security,
            openapi_extensions,
            middlewares,
        )

    # Note: we need ignore[override] because we are making the optional `description` field required.
    @override
    def put(  # type: ignore[override]
        self,
        rule: str,
        description: str,
        cors: bool | None = None,
        compress: bool = False,
        cache_control: str | None = None,
        summary: str | None = None,
        responses: dict[int, OpenAPIResponse] | None = None,
        response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION,
        tags: list[str] | None = None,
        operation_id: str | None = None,
        include_in_schema: bool = True,
        middlewares: list[Callable[..., Any]] | None = None,
    ):
        openapi_extensions = None
        security = None

        return super().put(
            rule,
            cors,
            compress,
            cache_control,
            summary,
            description,
            responses,
            response_description,
            tags,
            operation_id,
            include_in_schema,
            security,
            openapi_extensions,
            middlewares,
        )

    # Note: we need ignore[override] because we are making the optional `description` field required.
    @override
    def patch(  # type: ignore[override]
        self,
        rule: str,
        description: str,
        cors: bool | None = None,
        compress: bool = False,
        cache_control: str | None = None,
        summary: str | None = None,
        responses: dict[int, OpenAPIResponse] | None = None,
        response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION,
        tags: list[str] | None = None,
        operation_id: str | None = None,
        include_in_schema: bool = True,
        middlewares: list[Callable] | None = None,
    ):
        openapi_extensions = None
        security = None

        return super().patch(
            rule,
            cors,
            compress,
            cache_control,
            summary,
            description,
            responses,
            response_description,
            tags,
            operation_id,
            include_in_schema,
            security,
            openapi_extensions,
            middlewares,
        )

    # Note: we need ignore[override] because we are making the optional `description` field required.
    @override
    def delete(  # type: ignore[override]
        self,
        rule: str,
        description: str,
        cors: bool | None = None,
        compress: bool = False,
        cache_control: str | None = None,
        summary: str | None = None,
        responses: dict[int, OpenAPIResponse] | None = None,
        response_description: str = _DEFAULT_OPENAPI_RESPONSE_DESCRIPTION,
        tags: list[str] | None = None,
        operation_id: str | None = None,
        include_in_schema: bool = True,
        middlewares: list[Callable[..., Any]] | None = None,
    ):
        openapi_extensions = None
        security = None

        return super().delete(
            rule,
            cors,
            compress,
            cache_control,
            summary,
            description,
            responses,
            response_description,
            tags,
            operation_id,
            include_in_schema,
            security,
            openapi_extensions,
            middlewares,
        )

    @override
    def _convert_matches_into_route_keys(self, match: Match) -> dict[str, str]:
        # In Bedrock Agents, all the parameters come inside the "parameters" key, not on the apiPath
        # So we have to search for route parameters in the parameters key
        parameters: dict[str, str] = {}
        if match.groupdict() and self.current_event.parameters:
            parameters = {parameter["name"]: parameter["value"] for parameter in self.current_event.parameters}
        return parameters

    @override
    def get_openapi_json_schema(  # type: ignore[override]
        self,
        *,
        title: str = "Powertools API",
        version: str = DEFAULT_API_VERSION,
        openapi_version: str = DEFAULT_OPENAPI_VERSION,
        summary: str | None = None,
        description: str | None = None,
        tags: list[Tag | str] | None = None,
        servers: list[Server] | None = None,
        terms_of_service: str | None = None,
        contact: Contact | None = None,
        license_info: License | None = None,
        security_schemes: dict[str, SecurityScheme] | None = None,
        security: list[dict[str, list[str]]] | None = None,
    ) -> str:
        """
        Returns the OpenAPI schema as a JSON serializable dict.
        Since Bedrock Agents only support OpenAPI 3.0.0, we convert OpenAPI 3.1.0 schemas
        and enforce 3.0.0 compatibility for seamless integration.

        Parameters
        ----------
        title: str
            The title of the application.
        version: str
            The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
        openapi_version: str, default = "3.0.0"
            The version of the OpenAPI Specification (which the document uses).
        summary: str, optional
            A short summary of what the application does.
        description: str, optional
            A verbose explanation of the application behavior.
        tags: list[Tag, str], optional
            A list of tags used by the specification with additional metadata.
        servers: list[Server], optional
            An array of Server Objects, which provide connectivity information to a target server.
        terms_of_service: str, optional
            A URL to the Terms of Service for the API. MUST be in the format of a URL.
        contact: Contact, optional
            The contact information for the exposed API.
        license_info: License, optional
            The license information for the exposed API.
        security_schemes: dict[str, SecurityScheme]], optional
            A declaration of the security schemes available to be used in the specification.
        security: list[dict[str, list[str]]], optional
            A declaration of which security mechanisms are applied globally across the API.

        Returns
        -------
        str
            The OpenAPI schema as a JSON serializable dict.
        """
        from aws_lambda_powertools.event_handler.openapi.compat import model_json

        openapi_extensions = None

        schema = super().get_openapi_schema(
            title=title,
            version=version,
            openapi_version=openapi_version,
            summary=summary,
            description=description,
            tags=tags,
            servers=servers,
            terms_of_service=terms_of_service,
            contact=contact,
            license_info=license_info,
            security_schemes=security_schemes,
            security=security,
            openapi_extensions=openapi_extensions,
        )
        schema.openapi = "3.0.3"

        # Transform OpenAPI 3.1 into 3.0
        def inner(yaml_dict):
            if isinstance(yaml_dict, dict):
                if "anyOf" in yaml_dict and isinstance((anyOf := yaml_dict["anyOf"]), list):
                    for i, item in enumerate(anyOf):
                        if isinstance(item, dict) and item.get("type") == "null":
                            anyOf.pop(i)
                            yaml_dict["nullable"] = True
                if "examples" in yaml_dict:
                    examples = yaml_dict["examples"]
                    del yaml_dict["examples"]
                    if isinstance(examples, list) and len(examples):
                        yaml_dict["example"] = examples[0]
                for value in yaml_dict.values():
                    inner(value)
            elif isinstance(yaml_dict, list):
                for item in yaml_dict:
                    inner(item)

        model = json.loads(
            model_json(
                schema,
                by_alias=True,
                exclude_none=True,
                indent=2,
            ),
        )

        inner(model)

        return json.dumps(model)

Ancestors

Class variables

var current_event : BedrockAgentEvent

Methods

def get_openapi_json_schema(self, *, title: str = 'Powertools API', version: str = '1.0.0', openapi_version: str = '3.1.0', summary: str | None = None, description: str | None = None, tags: list[Tag | str] | None = None, servers: list[Server] | None = None, terms_of_service: str | None = None, contact: Contact | None = None, license_info: License | None = None, security_schemes: dict[str, SecurityScheme] | None = None, security: list[dict[str, list[str]]] | None = None)

Returns the OpenAPI schema as a JSON serializable dict. Since Bedrock Agents only support OpenAPI 3.0.0, we convert OpenAPI 3.1.0 schemas and enforce 3.0.0 compatibility for seamless integration.

Parameters

title : str
The title of the application.
version : str
The version of the OpenAPI document (which is distinct from the OpenAPI Specification version or the API
openapi_version : str, default = "3.0.0"
The version of the OpenAPI Specification (which the document uses).
summary : str, optional
A short summary of what the application does.
description : str, optional
A verbose explanation of the application behavior.
tags : list[Tag, str], optional
A list of tags used by the specification with additional metadata.
servers : list[Server], optional
An array of Server Objects, which provide connectivity information to a target server.
terms_of_service : str, optional
A URL to the Terms of Service for the API. MUST be in the format of a URL.
contact : Contact, optional
The contact information for the exposed API.
license_info : License, optional
The license information for the exposed API.
security_schemes : dict[str, SecurityScheme]], optional
A declaration of the security schemes available to be used in the specification.
security : list[dict[str, list[str]]], optional
A declaration of which security mechanisms are applied globally across the API.

Returns

str
The OpenAPI schema as a JSON serializable dict.

Inherited members

class CORSConfig (allow_origin: str = '*', extra_origins: list[str] | None = None, allow_headers: list[str] | None = None, expose_headers: list[str] | None = None, max_age: int | None = None, allow_credentials: bool = False)

CORS Config

Examples

Simple CORS example using the default permissive CORS, note that this should only be used during early prototyping.

from aws_lambda_powertools.event_handler.api_gateway import (
    APIGatewayRestResolver, CORSConfig
)

app = APIGatewayRestResolver(cors=CORSConfig())

@app.get("/my/path")
def with_cors():
    return {"message": "Foo"}

Using a custom CORSConfig where with_cors used the custom provided CORSConfig and without_cors do not include any CORS headers.

from aws_lambda_powertools.event_handler.api_gateway import (
    APIGatewayRestResolver, CORSConfig
)

cors_config = CORSConfig(
    allow_origin="https://wwww.example.com/",
    extra_origins=["https://dev.example.com/"],
    expose_headers=["x-exposed-response-header"],
    allow_headers=["x-custom-request-header"],
    max_age=100,
    allow_credentials=True,
)
app = APIGatewayRestResolver(cors=cors_config)

@app.get("/my/path")
def with_cors():
    return {"message": "Foo"}

@app.get("/another-one", cors=False)
def without_cors():
    return {"message": "Foo"}

Parameters

allow_origin : str
The value of the Access-Control-Allow-Origin to send in the response. Defaults to "*", but should only be used during development.
extra_origins : list[str] | None
The list of additional allowed origins.
allow_headers : list[str] | None
The list of additional allowed headers. This list is added to list of built-in allowed headers: Authorization, Content-Type, X-Amz-Date, X-Api-Key, X-Amz-Security-Token.
expose_headers : list[str] | None
A list of values to return for the Access-Control-Expose-Headers
max_age : int | None
The value for the Access-Control-Max-Age
allow_credentials : bool
A boolean value that sets the value of Access-Control-Allow-Credentials
Expand source code
class CORSConfig:
    """CORS Config

    Examples
    --------

    Simple CORS example using the default permissive CORS, note that this should only be used during early prototyping.

    ```python
    from aws_lambda_powertools.event_handler.api_gateway import (
        APIGatewayRestResolver, CORSConfig
    )

    app = APIGatewayRestResolver(cors=CORSConfig())

    @app.get("/my/path")
    def with_cors():
        return {"message": "Foo"}
    ```

    Using a custom CORSConfig where `with_cors` used the custom provided CORSConfig and `without_cors`
    do not include any CORS headers.

    ```python
    from aws_lambda_powertools.event_handler.api_gateway import (
        APIGatewayRestResolver, CORSConfig
    )

    cors_config = CORSConfig(
        allow_origin="https://wwww.example.com/",
        extra_origins=["https://dev.example.com/"],
        expose_headers=["x-exposed-response-header"],
        allow_headers=["x-custom-request-header"],
        max_age=100,
        allow_credentials=True,
    )
    app = APIGatewayRestResolver(cors=cors_config)

    @app.get("/my/path")
    def with_cors():
        return {"message": "Foo"}

    @app.get("/another-one", cors=False)
    def without_cors():
        return {"message": "Foo"}
    ```
    """

    _REQUIRED_HEADERS = ["Authorization", "Content-Type", "X-Amz-Date", "X-Api-Key", "X-Amz-Security-Token"]

    def __init__(
        self,
        allow_origin: str = "*",
        extra_origins: list[str] | None = None,
        allow_headers: list[str] | None = None,
        expose_headers: list[str] | None = None,
        max_age: int | None = None,
        allow_credentials: bool = False,
    ):
        """
        Parameters
        ----------
        allow_origin: str
            The value of the `Access-Control-Allow-Origin` to send in the response. Defaults to "*", but should
            only be used during development.
        extra_origins: list[str] | None
            The list of additional allowed origins.
        allow_headers: list[str] | None
            The list of additional allowed headers. This list is added to list of
            built-in allowed headers: `Authorization`, `Content-Type`, `X-Amz-Date`,
            `X-Api-Key`, `X-Amz-Security-Token`.
        expose_headers: list[str] | None
            A list of values to return for the Access-Control-Expose-Headers
        max_age: int | None
            The value for the `Access-Control-Max-Age`
        allow_credentials: bool
            A boolean value that sets the value of `Access-Control-Allow-Credentials`
        """

        self._allowed_origins = [allow_origin]

        if extra_origins:
            self._allowed_origins.extend(extra_origins)

        self.allow_headers = set(self._REQUIRED_HEADERS + (allow_headers or []))
        self.expose_headers = expose_headers or []
        self.max_age = max_age
        self.allow_credentials = allow_credentials

    def to_dict(self, origin: str | None) -> dict[str, str]:
        """Builds the configured Access-Control http headers"""

        # If there's no Origin, don't add any CORS headers
        if not origin:
            return {}

        # If the origin doesn't match any of the allowed origins, and we don't allow all origins ("*"),
        # don't add any CORS headers
        if origin not in self._allowed_origins and "*" not in self._allowed_origins:
            return {}

        # The origin matched an allowed origin, so return the CORS headers
        headers = {
            "Access-Control-Allow-Origin": origin,
            "Access-Control-Allow-Headers": CORSConfig.build_allow_methods(self.allow_headers),
        }

        if self.expose_headers:
            headers["Access-Control-Expose-Headers"] = ",".join(self.expose_headers)
        if self.max_age is not None:
            headers["Access-Control-Max-Age"] = str(self.max_age)
        if origin != "*" and self.allow_credentials is True:
            headers["Access-Control-Allow-Credentials"] = "true"
        return headers

    def allowed_origin(self, extracted_origin: str) -> str | None:
        if extracted_origin in self._allowed_origins:
            return extracted_origin
        if extracted_origin is not None and "*" in self._allowed_origins:
            return "*"

        return None

    @staticmethod
    def build_allow_methods(methods: set[str]) -> str:
        """Build sorted comma delimited methods for Access-Control-Allow-Methods header

        Parameters
        ----------
        methods : set[str]
            Set of HTTP Methods

        Returns
        -------
        set[str]
            Formatted string with all HTTP Methods allowed for CORS e.g., `GET, OPTIONS`

        """
        return ",".join(sorted(methods))

Static methods

def build_allow_methods(methods: set[str]) ‑> str

Build sorted comma delimited methods for Access-Control-Allow-Methods header

Parameters

methods : set[str]
Set of HTTP Methods

Returns

set[str]
Formatted string with all HTTP Methods allowed for CORS e.g., GET, OPTIONS

Methods

def allowed_origin(self, extracted_origin: str) ‑> str | None
def to_dict(self, origin: str | None) ‑> dict[str, str]

Builds the configured Access-Control http headers

class LambdaFunctionUrlResolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)

AWS Lambda Function URL resolver

Notes:

Lambda Function URL follows the API Gateway HTTP APIs Payload Format Version 2.0.

Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html - https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html#urls-payloads

Examples

Simple example integrating with Tracer

```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import LambdaFunctionUrlResolver

tracer = Tracer() app = LambdaFunctionUrlResolver()

@app.get("/get-call") def simple_get(): return {"message": "Foo"}

@app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data}

@tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)

Parameters

proxy_type : ProxyEventType
Proxy request type, defaults to API Gateway V1
cors : CORSConfig
Optionally configure and enabled CORS. Not each route will need to have to cors=True
debug : bool | None
Enables debug mode, by default False. Can be also be enabled by "POWERTOOLS_DEV" environment variable
serializer : Callable, optional
function to serialize obj to a JSON formatted str, by default json.dumps
strip_prefixes : list[str | Pattern], optional
optional list of prefixes to be removed from the request path before doing the routing. This is often used with api gateways with multiple custom mappings. Each prefix can be a static string or a compiled regex pattern
enable_validation : bool | None
Enables validation of the request body against the route schema, by default False.
Expand source code
class LambdaFunctionUrlResolver(ApiGatewayResolver):
    """AWS Lambda Function URL resolver

    Notes:
    -----
    Lambda Function URL follows the API Gateway HTTP APIs Payload Format Version 2.0.

    Documentation:
    - https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html
    - https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html#urls-payloads

    Examples
    --------
    Simple example integrating with Tracer

    ```python
    from aws_lambda_powertools import Tracer
    from aws_lambda_powertools.event_handler import LambdaFunctionUrlResolver

    tracer = Tracer()
    app = LambdaFunctionUrlResolver()

    @app.get("/get-call")
    def simple_get():
        return {"message": "Foo"}

    @app.post("/post-call")
    def simple_post():
        post_data: dict = app.current_event.json_body
        return {"message": post_data}

    @tracer.capture_lambda_handler
    def lambda_handler(event, context):
        return app.resolve(event, context)
    """

    current_event: LambdaFunctionUrlEvent

    def __init__(
        self,
        cors: CORSConfig | None = None,
        debug: bool | None = None,
        serializer: Callable[[dict], str] | None = None,
        strip_prefixes: list[str | Pattern] | None = None,
        enable_validation: bool = False,
    ):
        super().__init__(
            ProxyEventType.LambdaFunctionUrlEvent,
            cors,
            debug,
            serializer,
            strip_prefixes,
            enable_validation,
        )

    def _get_base_path(self) -> str:
        stage = self.current_event.request_context.stage
        if stage and stage != "$default" and self.current_event.request_context.http.method.startswith(f"/{stage}"):
            return f"/{stage}"
        return ""

Ancestors

Class variables

var current_event : LambdaFunctionUrlEvent

Inherited members

class Response (status_code: int, content_type: str | None = None, body: ResponseT | None = None, headers: Mapping[str, str | list[str]] | None = None, cookies: list[Cookie] | None = None, compress: bool | None = None)

Response data class that provides greater control over what is returned from the proxy event

Parameters

status_code : int
Http status code, example 200
content_type : str
Optionally set the Content-Type header, example "application/json". Note this will be merged into any provided http headers
body : str | bytes | None
Optionally set the response body. Note: bytes body will be automatically base64 encoded
headers : Mapping[str, str | list[str]]
Optionally set specific http headers. Setting "Content-Type" here would override the content_type value.
cookies : list[Cookie]
Optionally set cookies.
Expand source code
class Response(Generic[ResponseT]):
    """Response data class that provides greater control over what is returned from the proxy event"""

    def __init__(
        self,
        status_code: int,
        content_type: str | None = None,
        body: ResponseT | None = None,
        headers: Mapping[str, str | list[str]] | None = None,
        cookies: list[Cookie] | None = None,
        compress: bool | None = None,
    ):
        """

        Parameters
        ----------
        status_code: int
            Http status code, example 200
        content_type: str
            Optionally set the Content-Type header, example "application/json". Note this will be merged into any
            provided http headers
        body: str | bytes | None
            Optionally set the response body. Note: bytes body will be automatically base64 encoded
        headers: Mapping[str, str | list[str]]
            Optionally set specific http headers. Setting "Content-Type" here would override the `content_type` value.
        cookies: list[Cookie]
            Optionally set cookies.
        """
        self.status_code = status_code
        self.body = body
        self.base64_encoded = False
        self.headers: dict[str, str | list[str]] = dict(headers) if headers else {}
        self.cookies = cookies or []
        self.compress = compress
        self.content_type = content_type
        if content_type:
            self.headers.setdefault("Content-Type", content_type)

    def is_json(self) -> bool:
        """
        Returns True if the response is JSON, based on the Content-Type.
        """
        content_type = self.headers.get("Content-Type", "")
        if isinstance(content_type, list):
            content_type = content_type[0]
        return content_type.startswith("application/json")

Ancestors

  • typing.Generic

Methods

def is_json(self) ‑> bool

Returns True if the response is JSON, based on the Content-Type.

class VPCLatticeResolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)

VPC Lattice resolver

Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html#vpc-lattice-receiving-events

Examples

Simple example integrating with Tracer

```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import VPCLatticeResolver

tracer = Tracer() app = VPCLatticeResolver()

@app.get("/get-call") def simple_get(): return {"message": "Foo"}

@app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data}

@tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)

Amazon VPC Lattice resolver

Expand source code
class VPCLatticeResolver(ApiGatewayResolver):
    """VPC Lattice resolver

    Documentation:
    - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html
    - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html#vpc-lattice-receiving-events

    Examples
    --------
    Simple example integrating with Tracer

    ```python
    from aws_lambda_powertools import Tracer
    from aws_lambda_powertools.event_handler import VPCLatticeResolver

    tracer = Tracer()
    app = VPCLatticeResolver()

    @app.get("/get-call")
    def simple_get():
        return {"message": "Foo"}

    @app.post("/post-call")
    def simple_post():
        post_data: dict = app.current_event.json_body
        return {"message": post_data}

    @tracer.capture_lambda_handler
    def lambda_handler(event, context):
        return app.resolve(event, context)
    """

    current_event: VPCLatticeEvent

    def __init__(
        self,
        cors: CORSConfig | None = None,
        debug: bool | None = None,
        serializer: Callable[[dict], str] | None = None,
        strip_prefixes: list[str | Pattern] | None = None,
        enable_validation: bool = False,
    ):
        """Amazon VPC Lattice resolver"""
        super().__init__(ProxyEventType.VPCLatticeEvent, cors, debug, serializer, strip_prefixes, enable_validation)

    def _get_base_path(self) -> str:
        return ""

Ancestors

Class variables

var current_event : VPCLatticeEvent

Inherited members

class VPCLatticeV2Resolver (cors: CORSConfig | None = None, debug: bool | None = None, serializer: Callable[[dict], str] | None = None, strip_prefixes: list[str | Pattern] | None = None, enable_validation: bool = False)

VPC Lattice resolver

Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html#vpc-lattice-receiving-events

Examples

Simple example integrating with Tracer

```python from aws_lambda_powertools import Tracer from aws_lambda_powertools.event_handler import VPCLatticeV2Resolver

tracer = Tracer() app = VPCLatticeV2Resolver()

@app.get("/get-call") def simple_get(): return {"message": "Foo"}

@app.post("/post-call") def simple_post(): post_data: dict = app.current_event.json_body return {"message": post_data}

@tracer.capture_lambda_handler def lambda_handler(event, context): return app.resolve(event, context)

Amazon VPC Lattice resolver

Expand source code
class VPCLatticeV2Resolver(ApiGatewayResolver):
    """VPC Lattice resolver

    Documentation:
    - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html
    - https://docs.aws.amazon.com/lambda/latest/dg/services-vpc-lattice.html#vpc-lattice-receiving-events

    Examples
    --------
    Simple example integrating with Tracer

    ```python
    from aws_lambda_powertools import Tracer
    from aws_lambda_powertools.event_handler import VPCLatticeV2Resolver

    tracer = Tracer()
    app = VPCLatticeV2Resolver()

    @app.get("/get-call")
    def simple_get():
        return {"message": "Foo"}

    @app.post("/post-call")
    def simple_post():
        post_data: dict = app.current_event.json_body
        return {"message": post_data}

    @tracer.capture_lambda_handler
    def lambda_handler(event, context):
        return app.resolve(event, context)
    """

    current_event: VPCLatticeEventV2

    def __init__(
        self,
        cors: CORSConfig | None = None,
        debug: bool | None = None,
        serializer: Callable[[dict], str] | None = None,
        strip_prefixes: list[str | Pattern] | None = None,
        enable_validation: bool = False,
    ):
        """Amazon VPC Lattice resolver"""
        super().__init__(ProxyEventType.VPCLatticeEventV2, cors, debug, serializer, strip_prefixes, enable_validation)

    def _get_base_path(self) -> str:
        return ""

Ancestors

Class variables

var current_event : VPCLatticeEventV2

Inherited members