Module aws_lambda_powertools.event_handler.graphql_appsync.router

Classes

class Router

Abstract base class for Router (resolvers)

Expand source code
class Router(BaseRouter):
    context: dict
    current_batch_event: List[AppSyncResolverEvent] = []
    current_event: Optional[AppSyncResolverEvent] = None
    lambda_context: Optional[LambdaContext] = None

    def __init__(self):
        self.context = {}  # early init as customers might add context before event resolution
        self._resolver_registry = ResolverRegistry()
        self._batch_resolver_registry = ResolverRegistry()
        self._async_batch_resolver_registry = ResolverRegistry()

    def resolver(self, type_name: str = "*", field_name: Optional[str] = None) -> Callable:
        return self._resolver_registry.register(field_name=field_name, type_name=type_name)

    def batch_resolver(
        self,
        type_name: str = "*",
        field_name: Optional[str] = None,
        raise_on_error: bool = False,
        aggregate: bool = True,
    ) -> Callable:
        return self._batch_resolver_registry.register(
            field_name=field_name,
            type_name=type_name,
            raise_on_error=raise_on_error,
            aggregate=aggregate,
        )

    def async_batch_resolver(
        self,
        type_name: str = "*",
        field_name: Optional[str] = None,
        raise_on_error: bool = False,
        aggregate: bool = True,
    ) -> Callable:
        return self._async_batch_resolver_registry.register(
            field_name=field_name,
            type_name=type_name,
            raise_on_error=raise_on_error,
            aggregate=aggregate,
        )

    def append_context(self, **additional_context):
        """Append key=value data as routing context"""
        self.context.update(**additional_context)

    def clear_context(self):
        """Resets routing context"""
        self.context.clear()

Ancestors

Subclasses

Class variables

var context : dict
var current_batch_event : List[AppSyncResolverEvent]
var current_event : Optional[AppSyncResolverEvent]
var lambda_context : Optional[LambdaContext]

Methods

def append_context(self, **additional_context)

Append key=value data as routing context

def clear_context(self)

Resets routing context

Inherited members