Module aws_lambda_powertools.utilities.data_classes

Event Source Data Classes utility provides classes self-describing Lambda event sources.

Expand source code
"""
Event Source Data Classes utility provides classes self-describing Lambda event sources.
"""

from .alb_event import ALBEvent
from .api_gateway_proxy_event import APIGatewayProxyEvent, APIGatewayProxyEventV2
from .appsync_resolver_event import AppSyncResolverEvent
from .aws_config_rule_event import AWSConfigRuleEvent
from .cloud_watch_custom_widget_event import CloudWatchDashboardCustomWidgetEvent
from .cloud_watch_logs_event import CloudWatchLogsEvent
from .code_pipeline_job_event import CodePipelineJobEvent
from .connect_contact_flow_event import ConnectContactFlowEvent
from .dynamo_db_stream_event import DynamoDBStreamEvent
from .event_bridge_event import EventBridgeEvent
from .event_source import event_source
from .kafka_event import KafkaEvent
from .kinesis_firehose_event import KinesisFirehoseEvent
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_event import S3Event, S3EventBridgeNotificationEvent
from .ses_event import SESEvent
from .sns_event import SNSEvent
from .sqs_event import SQSEvent
from .vpc_lattice import VPCLatticeEvent

__all__ = [
    "APIGatewayProxyEvent",
    "APIGatewayProxyEventV2",
    "AppSyncResolverEvent",
    "ALBEvent",
    "CloudWatchDashboardCustomWidgetEvent",
    "CloudWatchLogsEvent",
    "CodePipelineJobEvent",
    "ConnectContactFlowEvent",
    "DynamoDBStreamEvent",
    "EventBridgeEvent",
    "KafkaEvent",
    "KinesisFirehoseEvent",
    "KinesisStreamEvent",
    "LambdaFunctionUrlEvent",
    "S3Event",
    "S3EventBridgeNotificationEvent",
    "SESEvent",
    "SNSEvent",
    "SQSEvent",
    "event_source",
    "AWSConfigRuleEvent",
    "VPCLatticeEvent",
]

Sub-modules

aws_lambda_powertools.utilities.data_classes.active_mq_event
aws_lambda_powertools.utilities.data_classes.alb_event
aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event
aws_lambda_powertools.utilities.data_classes.api_gateway_proxy_event
aws_lambda_powertools.utilities.data_classes.appsync
aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event
aws_lambda_powertools.utilities.data_classes.appsync_resolver_event
aws_lambda_powertools.utilities.data_classes.aws_config_rule_event
aws_lambda_powertools.utilities.data_classes.cloud_watch_custom_widget_event
aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event
aws_lambda_powertools.utilities.data_classes.code_pipeline_job_event
aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event
aws_lambda_powertools.utilities.data_classes.common
aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event
aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event
aws_lambda_powertools.utilities.data_classes.event_bridge_event
aws_lambda_powertools.utilities.data_classes.kafka_event
aws_lambda_powertools.utilities.data_classes.kinesis_firehose_event
aws_lambda_powertools.utilities.data_classes.kinesis_stream_event
aws_lambda_powertools.utilities.data_classes.lambda_function_url_event
aws_lambda_powertools.utilities.data_classes.rabbit_mq_event
aws_lambda_powertools.utilities.data_classes.s3_event
aws_lambda_powertools.utilities.data_classes.s3_object_event
aws_lambda_powertools.utilities.data_classes.ses_event
aws_lambda_powertools.utilities.data_classes.sns_event
aws_lambda_powertools.utilities.data_classes.sqs_event
aws_lambda_powertools.utilities.data_classes.vpc_lattice

Functions

def event_source(handler: Callable[[Any, LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, data_class: Type[DictWrapper])

Middleware to create an instance of the passed in event source data class

Parameters

handler : Callable
Lambda's handler
event : Dict
Lambda's Event
context : Dict
Lambda's Context
data_class : Type[DictWrapper]
Data class type to instantiate

Example

Sample usage

from aws_lambda_powertools.utilities.data_classes import S3Event, event_source

@event_source(data_class=S3Event)
def handler(event: S3Event, context):
     return {"key": event.object_key}
Expand source code
@lambda_handler_decorator
def event_source(
    handler: Callable[[Any, LambdaContext], Any],
    event: Dict[str, Any],
    context: LambdaContext,
    data_class: Type[DictWrapper],
):
    """Middleware to create an instance of the passed in event source data class

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: Dict
        Lambda's Event
    context: Dict
        Lambda's Context
    data_class: Type[DictWrapper]
        Data class type to instantiate

    Example
    --------

    **Sample usage**

        from aws_lambda_powertools.utilities.data_classes import S3Event, event_source

        @event_source(data_class=S3Event)
        def handler(event: S3Event, context):
             return {"key": event.object_key}
    """
    return handler(data_class(event), context)

Classes

class ALBEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Application load balancer event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class ALBEvent(BaseProxyEvent):
    """Application load balancer event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/services-alb.html
    - https://docs.aws.amazon.com/elasticloadbalancing/latest/application/lambda-functions.html
    """

    @property
    def request_context(self) -> ALBEventRequestContext:
        return ALBEventRequestContext(self._data)

    @property
    def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]:
        return self.get("multiValueQueryStringParameters")

    @property
    def multi_value_headers(self) -> Optional[Dict[str, List[str]]]:
        return self.get("multiValueHeaders")

    def header_serializer(self) -> BaseHeadersSerializer:
        # When using the ALB integration, the `multiValueHeaders` feature can be disabled (default) or enabled.
        # We can determine if the feature is enabled by looking if the event has a `multiValueHeaders` key.
        if self.multi_value_headers:
            return MultiValueHeadersSerializer()

        return SingleValueHeadersSerializer()

Ancestors

  • BaseProxyEvent
  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var multi_value_headers : Optional[Dict[str, List[str]]]
Expand source code
@property
def multi_value_headers(self) -> Optional[Dict[str, List[str]]]:
    return self.get("multiValueHeaders")
var multi_value_query_string_parameters : Optional[Dict[str, List[str]]]
Expand source code
@property
def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]:
    return self.get("multiValueQueryStringParameters")
var request_contextALBEventRequestContext
Expand source code
@property
def request_context(self) -> ALBEventRequestContext:
    return ALBEventRequestContext(self._data)

Methods

def header_serializer(self) ‑> BaseHeadersSerializer
Expand source code
def header_serializer(self) -> BaseHeadersSerializer:
    # When using the ALB integration, the `multiValueHeaders` feature can be disabled (default) or enabled.
    # We can determine if the feature is enabled by looking if the event has a `multiValueHeaders` key.
    if self.multi_value_headers:
        return MultiValueHeadersSerializer()

    return SingleValueHeadersSerializer()

Inherited members

class APIGatewayProxyEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

AWS Lambda proxy V1

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class APIGatewayProxyEvent(BaseProxyEvent):
    """AWS Lambda proxy V1

    Documentation:
    --------------
    - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html
    """

    @property
    def version(self) -> str:
        return self["version"]

    @property
    def resource(self) -> str:
        return self["resource"]

    @property
    def multi_value_headers(self) -> Dict[str, List[str]]:
        return self["multiValueHeaders"]

    @property
    def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]:
        return self.get("multiValueQueryStringParameters")

    @property
    def request_context(self) -> APIGatewayEventRequestContext:
        return APIGatewayEventRequestContext(self._data)

    @property
    def path_parameters(self) -> Optional[Dict[str, str]]:
        return self.get("pathParameters")

    @property
    def stage_variables(self) -> Optional[Dict[str, str]]:
        return self.get("stageVariables")

    def header_serializer(self) -> BaseHeadersSerializer:
        return MultiValueHeadersSerializer()

Ancestors

  • BaseProxyEvent
  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var multi_value_headers : Dict[str, List[str]]
Expand source code
@property
def multi_value_headers(self) -> Dict[str, List[str]]:
    return self["multiValueHeaders"]
var multi_value_query_string_parameters : Optional[Dict[str, List[str]]]
Expand source code
@property
def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]:
    return self.get("multiValueQueryStringParameters")
var path_parameters : Optional[Dict[str, str]]
Expand source code
@property
def path_parameters(self) -> Optional[Dict[str, str]]:
    return self.get("pathParameters")
var request_contextAPIGatewayEventRequestContext
Expand source code
@property
def request_context(self) -> APIGatewayEventRequestContext:
    return APIGatewayEventRequestContext(self._data)
var resource : str
Expand source code
@property
def resource(self) -> str:
    return self["resource"]
var stage_variables : Optional[Dict[str, str]]
Expand source code
@property
def stage_variables(self) -> Optional[Dict[str, str]]:
    return self.get("stageVariables")
var version : str
Expand source code
@property
def version(self) -> str:
    return self["version"]

Methods

def header_serializer(self) ‑> BaseHeadersSerializer
Expand source code
def header_serializer(self) -> BaseHeadersSerializer:
    return MultiValueHeadersSerializer()

Inherited members

class APIGatewayProxyEventV2 (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

AWS Lambda proxy V2 event

Notes:

Format 2.0 doesn't have multiValueHeaders or multiValueQueryStringParameters fields. Duplicate headers are combined with commas and included in the headers field. Duplicate query strings are combined with commas and included in the queryStringParameters field.

Format 2.0 includes a new cookies field. All cookie headers in the request are combined with commas and added to the cookies field. In the response to the client, each cookie becomes a set-cookie header.

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class APIGatewayProxyEventV2(BaseProxyEvent):
    """AWS Lambda proxy V2 event

    Notes:
    -----
    Format 2.0 doesn't have multiValueHeaders or multiValueQueryStringParameters fields. Duplicate headers
    are combined with commas and included in the headers field. Duplicate query strings are combined with
    commas and included in the queryStringParameters field.

    Format 2.0 includes a new cookies field. All cookie headers in the request are combined with commas and
    added to the cookies field. In the response to the client, each cookie becomes a set-cookie header.

    Documentation:
    --------------
    - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html
    """

    @property
    def version(self) -> str:
        return self["version"]

    @property
    def route_key(self) -> str:
        return self["routeKey"]

    @property
    def raw_path(self) -> str:
        return self["rawPath"]

    @property
    def raw_query_string(self) -> str:
        return self["rawQueryString"]

    @property
    def cookies(self) -> Optional[List[str]]:
        return self.get("cookies")

    @property
    def request_context(self) -> RequestContextV2:
        return RequestContextV2(self._data)

    @property
    def path_parameters(self) -> Optional[Dict[str, str]]:
        return self.get("pathParameters")

    @property
    def stage_variables(self) -> Optional[Dict[str, str]]:
        return self.get("stageVariables")

    @property
    def path(self) -> str:
        stage = self.request_context.stage
        if stage != "$default":
            return self.raw_path[len("/" + stage) :]
        return self.raw_path

    @property
    def http_method(self) -> str:
        """The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT."""
        return self.request_context.http.method

    def header_serializer(self):
        return HttpApiHeadersSerializer()

Ancestors

  • BaseProxyEvent
  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Subclasses

Instance variables

var cookies : Optional[List[str]]
Expand source code
@property
def cookies(self) -> Optional[List[str]]:
    return self.get("cookies")
var path : str
Expand source code
@property
def path(self) -> str:
    stage = self.request_context.stage
    if stage != "$default":
        return self.raw_path[len("/" + stage) :]
    return self.raw_path
var path_parameters : Optional[Dict[str, str]]
Expand source code
@property
def path_parameters(self) -> Optional[Dict[str, str]]:
    return self.get("pathParameters")
var raw_path : str
Expand source code
@property
def raw_path(self) -> str:
    return self["rawPath"]
var raw_query_string : str
Expand source code
@property
def raw_query_string(self) -> str:
    return self["rawQueryString"]
var request_contextRequestContextV2
Expand source code
@property
def request_context(self) -> RequestContextV2:
    return RequestContextV2(self._data)
var route_key : str
Expand source code
@property
def route_key(self) -> str:
    return self["routeKey"]
var stage_variables : Optional[Dict[str, str]]
Expand source code
@property
def stage_variables(self) -> Optional[Dict[str, str]]:
    return self.get("stageVariables")
var version : str
Expand source code
@property
def version(self) -> str:
    return self["version"]

Methods

def header_serializer(self)
Expand source code
def header_serializer(self):
    return HttpApiHeadersSerializer()

Inherited members

class AWSConfigRuleEvent (data: Dict[str, Any])

Events for AWS Config Rules Documentation:


Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class AWSConfigRuleEvent(DictWrapper):
    """Events for AWS Config Rules
    Documentation:
    --------------
    - https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_lambda-functions.html
    """

    def __init__(self, data: Dict[str, Any]):
        super().__init__(data)
        self._invoking_event: Optional[Any] = None
        self._rule_parameters: Optional[Any] = None

    @property
    def version(self) -> str:
        """The version of the event."""
        return self["version"]

    @property
    def invoking_event(
        self,
    ) -> AWSConfigConfigurationChanged | AWSConfigScheduledNotification | AWSConfigOversizedConfiguration:
        """The invoking payload of the event."""
        if self._invoking_event is None:
            self._invoking_event = self["invokingEvent"]

        return get_invoke_event(json.loads(self._invoking_event))

    @property
    def raw_invoking_event(self) -> str:
        """The raw invoking payload of the event."""
        return self["invokingEvent"]

    @property
    def rule_parameters(self) -> Dict:
        """The parameters of the event."""
        if self._rule_parameters is None:
            self._rule_parameters = self["ruleParameters"]

        return json.loads(self._rule_parameters)

    @property
    def result_token(self) -> str:
        """The result token of the event."""
        return self["resultToken"]

    @property
    def event_left_scope(self) -> bool:
        """The left scope of the event."""
        return self["eventLeftScope"]

    @property
    def execution_role_arn(self) -> str:
        """The execution role arn of the event."""
        return self["executionRoleArn"]

    @property
    def config_rule_arn(self) -> str:
        """The arn of the rule of the event."""
        return self["configRuleArn"]

    @property
    def config_rule_name(self) -> str:
        """The name of the rule of the event."""
        return self["configRuleName"]

    @property
    def config_rule_id(self) -> str:
        """The id of the rule of the event."""
        return self["configRuleId"]

    @property
    def accountid(self) -> str:
        """The accountid of the event."""
        return self["accountId"]

    @property
    def evalution_mode(self) -> Optional[str]:
        """The evalution mode of the event."""
        return self.get("evaluationMode")

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var accountid : str

The accountid of the event.

Expand source code
@property
def accountid(self) -> str:
    """The accountid of the event."""
    return self["accountId"]
var config_rule_arn : str

The arn of the rule of the event.

Expand source code
@property
def config_rule_arn(self) -> str:
    """The arn of the rule of the event."""
    return self["configRuleArn"]
var config_rule_id : str

The id of the rule of the event.

Expand source code
@property
def config_rule_id(self) -> str:
    """The id of the rule of the event."""
    return self["configRuleId"]
var config_rule_name : str

The name of the rule of the event.

Expand source code
@property
def config_rule_name(self) -> str:
    """The name of the rule of the event."""
    return self["configRuleName"]
var evalution_mode : Optional[str]

The evalution mode of the event.

Expand source code
@property
def evalution_mode(self) -> Optional[str]:
    """The evalution mode of the event."""
    return self.get("evaluationMode")
var event_left_scope : bool

The left scope of the event.

Expand source code
@property
def event_left_scope(self) -> bool:
    """The left scope of the event."""
    return self["eventLeftScope"]
var execution_role_arn : str

The execution role arn of the event.

Expand source code
@property
def execution_role_arn(self) -> str:
    """The execution role arn of the event."""
    return self["executionRoleArn"]
var invoking_eventAWSConfigConfigurationChanged | AWSConfigScheduledNotification | AWSConfigOversizedConfiguration

The invoking payload of the event.

Expand source code
@property
def invoking_event(
    self,
) -> AWSConfigConfigurationChanged | AWSConfigScheduledNotification | AWSConfigOversizedConfiguration:
    """The invoking payload of the event."""
    if self._invoking_event is None:
        self._invoking_event = self["invokingEvent"]

    return get_invoke_event(json.loads(self._invoking_event))
var raw_invoking_event : str

The raw invoking payload of the event.

Expand source code
@property
def raw_invoking_event(self) -> str:
    """The raw invoking payload of the event."""
    return self["invokingEvent"]
var result_token : str

The result token of the event.

Expand source code
@property
def result_token(self) -> str:
    """The result token of the event."""
    return self["resultToken"]
var rule_parameters : Dict

The parameters of the event.

Expand source code
@property
def rule_parameters(self) -> Dict:
    """The parameters of the event."""
    if self._rule_parameters is None:
        self._rule_parameters = self["ruleParameters"]

    return json.loads(self._rule_parameters)
var version : str

The version of the event.

Expand source code
@property
def version(self) -> str:
    """The version of the event."""
    return self["version"]

Inherited members

class AppSyncResolverEvent (data: dict)

AppSync resolver event

NOTE: AppSync Resolver Events can come in various shapes this data class supports both Amplify GraphQL directive @function and Direct Lambda Resolver

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class AppSyncResolverEvent(DictWrapper):
    """AppSync resolver event

    **NOTE:** AppSync Resolver Events can come in various shapes this data class
    supports both Amplify GraphQL directive @function and Direct Lambda Resolver

    Documentation:
    -------------
    - https://docs.aws.amazon.com/appsync/latest/devguide/resolver-context-reference.html
    - https://docs.amplify.aws/cli/graphql-transformer/function#structure-of-the-function-event
    """

    def __init__(self, data: dict):
        super().__init__(data)

        info: Optional[dict] = data.get("info")
        if not info:
            info = {"fieldName": self.get("fieldName"), "parentTypeName": self.get("typeName")}

        self._info = AppSyncResolverEventInfo(info)

    @property
    def type_name(self) -> str:
        """The name of the parent type for the field that is currently being resolved."""
        return self.info.parent_type_name

    @property
    def field_name(self) -> str:
        """The name of the field that is currently being resolved."""
        return self.info.field_name

    @property
    def arguments(self) -> Dict[str, Any]:
        """A map that contains all GraphQL arguments for this field."""
        return self["arguments"]

    @property
    def identity(self) -> Union[None, AppSyncIdentityIAM, AppSyncIdentityCognito]:
        """An object that contains information about the caller.

        Depending on the type of identify found:

        - API_KEY authorization - returns None
        - AWS_IAM authorization - returns AppSyncIdentityIAM
        - AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito
        """
        return get_identity_object(self.get("identity"))

    @property
    def source(self) -> Optional[Dict[str, Any]]:
        """A map that contains the resolution of the parent field."""
        return self.get("source")

    @property
    def request_headers(self) -> Dict[str, str]:
        """Request headers"""
        return self["request"]["headers"]

    @property
    def prev_result(self) -> Optional[Dict[str, Any]]:
        """It represents the result of whatever previous operation was executed in a pipeline resolver."""
        prev = self.get("prev")
        if not prev:
            return None
        return prev.get("result")

    @property
    def info(self) -> AppSyncResolverEventInfo:
        """The info section contains information about the GraphQL request."""
        return self._info

    @property
    def stash(self) -> Optional[dict]:
        """The stash is a map that is made available inside each resolver and function mapping template.
        The same stash instance lives through a single resolver execution. This means that you can use the
        stash to pass arbitrary data across request and response mapping templates, and across functions in
        a pipeline resolver."""
        return self.get("stash")

    def get_header_value(
        self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False
    ) -> Optional[str]:
        """Get header value by name

        Parameters
        ----------
        name: str
            Header name
        default_value: str, optional
            Default value if no value was found by name
        case_sensitive: bool
            Whether to use a case-sensitive look up
        Returns
        -------
        str, optional
            Header value
        """
        return get_header_value(self.request_headers, name, default_value, case_sensitive)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var arguments : Dict[str, Any]

A map that contains all GraphQL arguments for this field.

Expand source code
@property
def arguments(self) -> Dict[str, Any]:
    """A map that contains all GraphQL arguments for this field."""
    return self["arguments"]
var field_name : str

The name of the field that is currently being resolved.

Expand source code
@property
def field_name(self) -> str:
    """The name of the field that is currently being resolved."""
    return self.info.field_name
var identity : Union[ForwardRef(None), AppSyncIdentityIAMAppSyncIdentityCognito]

An object that contains information about the caller.

Depending on the type of identify found:

  • API_KEY authorization - returns None
  • AWS_IAM authorization - returns AppSyncIdentityIAM
  • AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito
Expand source code
@property
def identity(self) -> Union[None, AppSyncIdentityIAM, AppSyncIdentityCognito]:
    """An object that contains information about the caller.

    Depending on the type of identify found:

    - API_KEY authorization - returns None
    - AWS_IAM authorization - returns AppSyncIdentityIAM
    - AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito
    """
    return get_identity_object(self.get("identity"))
var infoAppSyncResolverEventInfo

The info section contains information about the GraphQL request.

Expand source code
@property
def info(self) -> AppSyncResolverEventInfo:
    """The info section contains information about the GraphQL request."""
    return self._info
var prev_result : Optional[Dict[str, Any]]

It represents the result of whatever previous operation was executed in a pipeline resolver.

Expand source code
@property
def prev_result(self) -> Optional[Dict[str, Any]]:
    """It represents the result of whatever previous operation was executed in a pipeline resolver."""
    prev = self.get("prev")
    if not prev:
        return None
    return prev.get("result")
var request_headers : Dict[str, str]

Request headers

Expand source code
@property
def request_headers(self) -> Dict[str, str]:
    """Request headers"""
    return self["request"]["headers"]
var source : Optional[Dict[str, Any]]

A map that contains the resolution of the parent field.

Expand source code
@property
def source(self) -> Optional[Dict[str, Any]]:
    """A map that contains the resolution of the parent field."""
    return self.get("source")
var stash : Optional[dict]

The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver.

Expand source code
@property
def stash(self) -> Optional[dict]:
    """The stash is a map that is made available inside each resolver and function mapping template.
    The same stash instance lives through a single resolver execution. This means that you can use the
    stash to pass arbitrary data across request and response mapping templates, and across functions in
    a pipeline resolver."""
    return self.get("stash")
var type_name : str

The name of the parent type for the field that is currently being resolved.

Expand source code
@property
def type_name(self) -> str:
    """The name of the parent type for the field that is currently being resolved."""
    return self.info.parent_type_name

Methods

def get_header_value(self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False) ‑> Optional[str]

Get header value by name

Parameters

name : str
Header name
default_value : str, optional
Default value if no value was found by name
case_sensitive : bool
Whether to use a case-sensitive look up

Returns

str, optional
Header value
Expand source code
def get_header_value(
    self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False
) -> Optional[str]:
    """Get header value by name

    Parameters
    ----------
    name: str
        Header name
    default_value: str, optional
        Default value if no value was found by name
    case_sensitive: bool
        Whether to use a case-sensitive look up
    Returns
    -------
    str, optional
        Header value
    """
    return get_header_value(self.request_headers, name, default_value, case_sensitive)

Inherited members

class CloudWatchDashboardCustomWidgetEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

CloudWatch dashboard custom widget event

You can use a Lambda function to create a custom widget on a CloudWatch dashboard.

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class CloudWatchDashboardCustomWidgetEvent(DictWrapper):
    """CloudWatch dashboard custom widget event

    You can use a Lambda function to create a custom widget on a CloudWatch dashboard.

    Documentation:
    -------------
    - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/add_custom_widget_dashboard_about.html
    """

    @property
    def describe(self) -> bool:
        """Display widget documentation"""
        return bool(self.get("describe", False))

    @property
    def widget_context(self) -> Optional[CloudWatchWidgetContext]:
        """The widget context"""
        if self.get("widgetContext"):
            return CloudWatchWidgetContext(self["widgetContext"])

        return None

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var describe : bool

Display widget documentation

Expand source code
@property
def describe(self) -> bool:
    """Display widget documentation"""
    return bool(self.get("describe", False))
var widget_context : Optional[CloudWatchWidgetContext]

The widget context

Expand source code
@property
def widget_context(self) -> Optional[CloudWatchWidgetContext]:
    """The widget context"""
    if self.get("widgetContext"):
        return CloudWatchWidgetContext(self["widgetContext"])

    return None

Inherited members

class CloudWatchLogsEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

CloudWatch Logs log stream event

You can use a Lambda function to monitor and analyze logs from an Amazon CloudWatch Logs log stream.

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class CloudWatchLogsEvent(DictWrapper):
    """CloudWatch Logs log stream event

    You can use a Lambda function to monitor and analyze logs from an Amazon CloudWatch Logs log stream.

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html
    """

    _decompressed_logs_data = None
    _json_logs_data = None

    @property
    def raw_logs_data(self) -> str:
        """The value of the `data` field is a Base64 encoded ZIP archive."""
        return self["awslogs"]["data"]

    @property
    def decompress_logs_data(self) -> bytes:
        """Decode and decompress log data"""
        if self._decompressed_logs_data is None:
            payload = base64.b64decode(self.raw_logs_data)
            self._decompressed_logs_data = zlib.decompress(payload, zlib.MAX_WBITS | 32)
        return self._decompressed_logs_data

    def parse_logs_data(self) -> CloudWatchLogsDecodedData:
        """Decode, decompress and parse json data as CloudWatchLogsDecodedData"""
        if self._json_logs_data is None:
            self._json_logs_data = json.loads(self.decompress_logs_data.decode("UTF-8"))
        return CloudWatchLogsDecodedData(self._json_logs_data)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var decompress_logs_data : bytes

Decode and decompress log data

Expand source code
@property
def decompress_logs_data(self) -> bytes:
    """Decode and decompress log data"""
    if self._decompressed_logs_data is None:
        payload = base64.b64decode(self.raw_logs_data)
        self._decompressed_logs_data = zlib.decompress(payload, zlib.MAX_WBITS | 32)
    return self._decompressed_logs_data
var raw_logs_data : str

The value of the data field is a Base64 encoded ZIP archive.

Expand source code
@property
def raw_logs_data(self) -> str:
    """The value of the `data` field is a Base64 encoded ZIP archive."""
    return self["awslogs"]["data"]

Methods

def parse_logs_data(self) ‑> CloudWatchLogsDecodedData

Decode, decompress and parse json data as CloudWatchLogsDecodedData

Expand source code
def parse_logs_data(self) -> CloudWatchLogsDecodedData:
    """Decode, decompress and parse json data as CloudWatchLogsDecodedData"""
    if self._json_logs_data is None:
        self._json_logs_data = json.loads(self.decompress_logs_data.decode("UTF-8"))
    return CloudWatchLogsDecodedData(self._json_logs_data)

Inherited members

class CodePipelineJobEvent (data: Dict[str, Any])

AWS CodePipeline Job Event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class CodePipelineJobEvent(DictWrapper):
    """AWS CodePipeline Job Event

    Documentation:
    -------------
    - https://docs.aws.amazon.com/codepipeline/latest/userguide/actions-invoke-lambda-function.html
    - https://docs.aws.amazon.com/lambda/latest/dg/services-codepipeline.html
    """

    def __init__(self, data: Dict[str, Any]):
        super().__init__(data)
        self._job = self["CodePipeline.job"]

    @property
    def get_id(self) -> str:
        """Job id"""
        return self._job["id"]

    @property
    def account_id(self) -> str:
        """Account id"""
        return self._job["accountId"]

    @property
    def data(self) -> CodePipelineData:
        """Code pipeline jab data"""
        return CodePipelineData(self._job["data"])

    @property
    def user_parameters(self) -> Optional[str]:
        """Action configuration user parameters"""
        return self.data.action_configuration.configuration.user_parameters

    @property
    def decoded_user_parameters(self) -> Optional[Dict[str, Any]]:
        """Json Decoded action configuration user parameters"""
        return self.data.action_configuration.configuration.decoded_user_parameters

    @property
    def input_bucket_name(self) -> str:
        """Get the first input artifact bucket name"""
        return self.data.input_artifacts[0].location.s3_location.bucket_name

    @property
    def input_object_key(self) -> str:
        """Get the first input artifact order key unquote plus"""
        return self.data.input_artifacts[0].location.s3_location.object_key

    def setup_s3_client(self):
        """Creates an S3 client

        Uses the credentials passed in the event by CodePipeline. These
        credentials can be used to access the artifact bucket.

        Returns
        -------
        BaseClient
            An S3 client with the appropriate credentials
        """
        # IMPORTING boto3 within the FUNCTION and not at the top level to get
        # it only when we explicitly want it for better performance.
        import boto3

        from aws_lambda_powertools.shared import user_agent

        s3 = boto3.client(
            "s3",
            aws_access_key_id=self.data.artifact_credentials.access_key_id,
            aws_secret_access_key=self.data.artifact_credentials.secret_access_key,
            aws_session_token=self.data.artifact_credentials.session_token,
        )
        user_agent.register_feature_to_client(client=s3, feature="data_classes")
        return s3

    def find_input_artifact(self, artifact_name: str) -> Optional[CodePipelineArtifact]:
        """Find an input artifact by artifact name

        Parameters
        ----------
        artifact_name : str
            The name of the input artifact to look for

        Returns
        -------
        CodePipelineArtifact, None
            Matching CodePipelineArtifact if found
        """
        for artifact in self.data.input_artifacts:
            if artifact.name == artifact_name:
                return artifact
        return None

    def get_artifact(self, artifact_name: str, filename: str) -> Optional[str]:
        """Get a file within an artifact zip on s3

        Parameters
        ----------
        artifact_name : str
            Name of the S3 artifact to download
        filename : str
            The file name within the artifact zip to extract as a string

        Returns
        -------
        str, None
            Returns the contents file contents as a string
        """
        artifact = self.find_input_artifact(artifact_name)
        if artifact is None:
            return None

        with tempfile.NamedTemporaryFile() as tmp_file:
            s3 = self.setup_s3_client()
            bucket = artifact.location.s3_location.bucket_name
            key = artifact.location.s3_location.key
            s3.download_file(bucket, key, tmp_file.name)
            with zipfile.ZipFile(tmp_file.name, "r") as zip_file:
                return zip_file.read(filename).decode("UTF-8")

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var account_id : str

Account id

Expand source code
@property
def account_id(self) -> str:
    """Account id"""
    return self._job["accountId"]
var dataCodePipelineData

Code pipeline jab data

Expand source code
@property
def data(self) -> CodePipelineData:
    """Code pipeline jab data"""
    return CodePipelineData(self._job["data"])
var decoded_user_parameters : Optional[Dict[str, Any]]

Json Decoded action configuration user parameters

Expand source code
@property
def decoded_user_parameters(self) -> Optional[Dict[str, Any]]:
    """Json Decoded action configuration user parameters"""
    return self.data.action_configuration.configuration.decoded_user_parameters
var get_id : str

Job id

Expand source code
@property
def get_id(self) -> str:
    """Job id"""
    return self._job["id"]
var input_bucket_name : str

Get the first input artifact bucket name

Expand source code
@property
def input_bucket_name(self) -> str:
    """Get the first input artifact bucket name"""
    return self.data.input_artifacts[0].location.s3_location.bucket_name
var input_object_key : str

Get the first input artifact order key unquote plus

Expand source code
@property
def input_object_key(self) -> str:
    """Get the first input artifact order key unquote plus"""
    return self.data.input_artifacts[0].location.s3_location.object_key
var user_parameters : Optional[str]

Action configuration user parameters

Expand source code
@property
def user_parameters(self) -> Optional[str]:
    """Action configuration user parameters"""
    return self.data.action_configuration.configuration.user_parameters

Methods

def find_input_artifact(self, artifact_name: str) ‑> Optional[CodePipelineArtifact]

Find an input artifact by artifact name

Parameters

artifact_name : str
The name of the input artifact to look for

Returns

CodePipelineArtifact, None
Matching CodePipelineArtifact if found
Expand source code
def find_input_artifact(self, artifact_name: str) -> Optional[CodePipelineArtifact]:
    """Find an input artifact by artifact name

    Parameters
    ----------
    artifact_name : str
        The name of the input artifact to look for

    Returns
    -------
    CodePipelineArtifact, None
        Matching CodePipelineArtifact if found
    """
    for artifact in self.data.input_artifacts:
        if artifact.name == artifact_name:
            return artifact
    return None
def get_artifact(self, artifact_name: str, filename: str) ‑> Optional[str]

Get a file within an artifact zip on s3

Parameters

artifact_name : str
Name of the S3 artifact to download
filename : str
The file name within the artifact zip to extract as a string

Returns

str, None
Returns the contents file contents as a string
Expand source code
def get_artifact(self, artifact_name: str, filename: str) -> Optional[str]:
    """Get a file within an artifact zip on s3

    Parameters
    ----------
    artifact_name : str
        Name of the S3 artifact to download
    filename : str
        The file name within the artifact zip to extract as a string

    Returns
    -------
    str, None
        Returns the contents file contents as a string
    """
    artifact = self.find_input_artifact(artifact_name)
    if artifact is None:
        return None

    with tempfile.NamedTemporaryFile() as tmp_file:
        s3 = self.setup_s3_client()
        bucket = artifact.location.s3_location.bucket_name
        key = artifact.location.s3_location.key
        s3.download_file(bucket, key, tmp_file.name)
        with zipfile.ZipFile(tmp_file.name, "r") as zip_file:
            return zip_file.read(filename).decode("UTF-8")
def setup_s3_client(self)

Creates an S3 client

Uses the credentials passed in the event by CodePipeline. These credentials can be used to access the artifact bucket.

Returns

BaseClient
An S3 client with the appropriate credentials
Expand source code
def setup_s3_client(self):
    """Creates an S3 client

    Uses the credentials passed in the event by CodePipeline. These
    credentials can be used to access the artifact bucket.

    Returns
    -------
    BaseClient
        An S3 client with the appropriate credentials
    """
    # IMPORTING boto3 within the FUNCTION and not at the top level to get
    # it only when we explicitly want it for better performance.
    import boto3

    from aws_lambda_powertools.shared import user_agent

    s3 = boto3.client(
        "s3",
        aws_access_key_id=self.data.artifact_credentials.access_key_id,
        aws_secret_access_key=self.data.artifact_credentials.secret_access_key,
        aws_session_token=self.data.artifact_credentials.session_token,
    )
    user_agent.register_feature_to_client(client=s3, feature="data_classes")
    return s3

Inherited members

class ConnectContactFlowEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Amazon Connect contact flow event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class ConnectContactFlowEvent(DictWrapper):
    """Amazon Connect contact flow event

    Documentation:
    -------------
    - https://docs.aws.amazon.com/connect/latest/adminguide/connect-lambda-functions.html
    """

    @property
    def contact_data(self) -> ConnectContactFlowData:
        """This is always passed by Amazon Connect for every contact. Some parameters are optional."""
        return ConnectContactFlowData(self["Details"]["ContactData"])

    @property
    def parameters(self) -> Dict[str, str]:
        """These are parameters specific to this call that were defined when you created the Lambda function."""
        return self["Details"]["Parameters"]

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var contact_dataConnectContactFlowData

This is always passed by Amazon Connect for every contact. Some parameters are optional.

Expand source code
@property
def contact_data(self) -> ConnectContactFlowData:
    """This is always passed by Amazon Connect for every contact. Some parameters are optional."""
    return ConnectContactFlowData(self["Details"]["ContactData"])
var parameters : Dict[str, str]

These are parameters specific to this call that were defined when you created the Lambda function.

Expand source code
@property
def parameters(self) -> Dict[str, str]:
    """These are parameters specific to this call that were defined when you created the Lambda function."""
    return self["Details"]["Parameters"]

Inherited members

class DynamoDBStreamEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Dynamo DB Stream Event

Documentation:

Example

Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.

from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=DynamoDBStreamEvent)
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
    for record in event.records:
        # {"N": "123.45"} => Decimal("123.45")
        key: str = record.dynamodb.keys["id"]
        print(key)

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class DynamoDBStreamEvent(DictWrapper):
    """Dynamo DB Stream Event

    Documentation:
    -------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html

    Example
    -------
    **Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.**

        from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
        from aws_lambda_powertools.utilities.typing import LambdaContext


        @event_source(data_class=DynamoDBStreamEvent)
        def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
            for record in event.records:
                # {"N": "123.45"} => Decimal("123.45")
                key: str = record.dynamodb.keys["id"]
                print(key)
    """

    @property
    def records(self) -> Iterator[DynamoDBRecord]:
        for record in self["Records"]:
            yield DynamoDBRecord(record)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var records : Iterator[DynamoDBRecord]
Expand source code
@property
def records(self) -> Iterator[DynamoDBRecord]:
    for record in self["Records"]:
        yield DynamoDBRecord(record)

Inherited members

class EventBridgeEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Amazon EventBridge Event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class EventBridgeEvent(DictWrapper):
    """Amazon EventBridge Event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/eventbridge/latest/userguide/aws-events.html
    """

    @property
    def get_id(self) -> str:
        """A unique value is generated for every event. This can be helpful in tracing events as
        they move through rules to targets, and are processed."""
        # Note: this name conflicts with existing python builtins
        return self["id"]

    @property
    def version(self) -> str:
        """By default, this is set to 0 (zero) in all events."""
        return self["version"]

    @property
    def account(self) -> str:
        """The 12-digit number identifying an AWS account."""
        return self["account"]

    @property
    def time(self) -> str:
        """The event timestamp, which can be specified by the service originating the event.

        If the event spans a time interval, the service might choose to report the start time, so
        this value can be noticeably before the time the event is actually received.
        """
        return self["time"]

    @property
    def region(self) -> str:
        """Identifies the AWS region where the event originated."""
        return self["region"]

    @property
    def resources(self) -> List[str]:
        """This JSON array contains ARNs that identify resources that are involved in the event.
        Inclusion of these ARNs is at the discretion of the service."""
        return self["resources"]

    @property
    def source(self) -> str:
        """Identifies the service that sourced the event. All events sourced from within AWS begin with "aws." """
        return self["source"]

    @property
    def detail_type(self) -> str:
        """Identifies, in combination with the source field, the fields and values that appear in the detail field."""
        return self["detail-type"]

    @property
    def detail(self) -> Dict[str, Any]:
        """A JSON object, whose content is at the discretion of the service originating the event."""
        return self["detail"]

    @property
    def replay_name(self) -> Optional[str]:
        """Identifies whether the event is being replayed and what is the name of the replay."""
        return self["replay-name"]

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Subclasses

Instance variables

var account : str

The 12-digit number identifying an AWS account.

Expand source code
@property
def account(self) -> str:
    """The 12-digit number identifying an AWS account."""
    return self["account"]
var detail : Dict[str, Any]

A JSON object, whose content is at the discretion of the service originating the event.

Expand source code
@property
def detail(self) -> Dict[str, Any]:
    """A JSON object, whose content is at the discretion of the service originating the event."""
    return self["detail"]
var detail_type : str

Identifies, in combination with the source field, the fields and values that appear in the detail field.

Expand source code
@property
def detail_type(self) -> str:
    """Identifies, in combination with the source field, the fields and values that appear in the detail field."""
    return self["detail-type"]
var get_id : str

A unique value is generated for every event. This can be helpful in tracing events as they move through rules to targets, and are processed.

Expand source code
@property
def get_id(self) -> str:
    """A unique value is generated for every event. This can be helpful in tracing events as
    they move through rules to targets, and are processed."""
    # Note: this name conflicts with existing python builtins
    return self["id"]
var region : str

Identifies the AWS region where the event originated.

Expand source code
@property
def region(self) -> str:
    """Identifies the AWS region where the event originated."""
    return self["region"]
var replay_name : Optional[str]

Identifies whether the event is being replayed and what is the name of the replay.

Expand source code
@property
def replay_name(self) -> Optional[str]:
    """Identifies whether the event is being replayed and what is the name of the replay."""
    return self["replay-name"]
var resources : List[str]

This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.

Expand source code
@property
def resources(self) -> List[str]:
    """This JSON array contains ARNs that identify resources that are involved in the event.
    Inclusion of these ARNs is at the discretion of the service."""
    return self["resources"]
var source : str

Identifies the service that sourced the event. All events sourced from within AWS begin with "aws."

Expand source code
@property
def source(self) -> str:
    """Identifies the service that sourced the event. All events sourced from within AWS begin with "aws." """
    return self["source"]
var time : str

The event timestamp, which can be specified by the service originating the event.

If the event spans a time interval, the service might choose to report the start time, so this value can be noticeably before the time the event is actually received.

Expand source code
@property
def time(self) -> str:
    """The event timestamp, which can be specified by the service originating the event.

    If the event spans a time interval, the service might choose to report the start time, so
    this value can be noticeably before the time the event is actually received.
    """
    return self["time"]
var version : str

By default, this is set to 0 (zero) in all events.

Expand source code
@property
def version(self) -> str:
    """By default, this is set to 0 (zero) in all events."""
    return self["version"]

Inherited members

class KafkaEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Self-managed or MSK Apache Kafka event trigger Documentation:


Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KafkaEvent(DictWrapper):
    """Self-managed or MSK Apache Kafka event trigger
    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
    - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
    """

    @property
    def event_source(self) -> str:
        """The AWS service from which the Kafka event record originated."""
        return self["eventSource"]

    @property
    def event_source_arn(self) -> Optional[str]:
        """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK."""
        return self.get("eventSourceArn")

    @property
    def bootstrap_servers(self) -> str:
        """The Kafka bootstrap URL."""
        return self["bootstrapServers"]

    @property
    def decoded_bootstrap_servers(self) -> List[str]:
        """The decoded Kafka bootstrap URL."""
        return self.bootstrap_servers.split(",")

    @property
    def records(self) -> Iterator[KafkaEventRecord]:
        """The Kafka records."""
        for chunk in self["records"].values():
            for record in chunk:
                yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)

    @property
    def record(self) -> KafkaEventRecord:
        """The next Kafka record."""
        return next(self.records)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var bootstrap_servers : str

The Kafka bootstrap URL.

Expand source code
@property
def bootstrap_servers(self) -> str:
    """The Kafka bootstrap URL."""
    return self["bootstrapServers"]
var decoded_bootstrap_servers : List[str]

The decoded Kafka bootstrap URL.

Expand source code
@property
def decoded_bootstrap_servers(self) -> List[str]:
    """The decoded Kafka bootstrap URL."""
    return self.bootstrap_servers.split(",")
var event_source : str

The AWS service from which the Kafka event record originated.

Expand source code
@property
def event_source(self) -> str:
    """The AWS service from which the Kafka event record originated."""
    return self["eventSource"]
var event_source_arn : Optional[str]

The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.

Expand source code
@property
def event_source_arn(self) -> Optional[str]:
    """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK."""
    return self.get("eventSourceArn")
var recordKafkaEventRecord

The next Kafka record.

Expand source code
@property
def record(self) -> KafkaEventRecord:
    """The next Kafka record."""
    return next(self.records)
var records : Iterator[KafkaEventRecord]

The Kafka records.

Expand source code
@property
def records(self) -> Iterator[KafkaEventRecord]:
    """The Kafka records."""
    for chunk in self["records"].values():
        for record in chunk:
            yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)

Inherited members

class KinesisFirehoseEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Kinesis Data Firehose event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisFirehoseEvent(DictWrapper):
    """Kinesis Data Firehose event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
    """

    @property
    def invocation_id(self) -> str:
        """Unique ID for for Lambda invocation"""
        return self["invocationId"]

    @property
    def delivery_stream_arn(self) -> str:
        """ARN of the Firehose Data Firehose Delivery Stream"""
        return self["deliveryStreamArn"]

    @property
    def source_kinesis_stream_arn(self) -> Optional[str]:
        """ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
        return self.get("sourceKinesisStreamArn")

    @property
    def region(self) -> str:
        """AWS region where the event originated eg: us-east-1"""
        return self["region"]

    @property
    def records(self) -> Iterator[KinesisFirehoseRecord]:
        for record in self["records"]:
            yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var delivery_stream_arn : str

ARN of the Firehose Data Firehose Delivery Stream

Expand source code
@property
def delivery_stream_arn(self) -> str:
    """ARN of the Firehose Data Firehose Delivery Stream"""
    return self["deliveryStreamArn"]
var invocation_id : str

Unique ID for for Lambda invocation

Expand source code
@property
def invocation_id(self) -> str:
    """Unique ID for for Lambda invocation"""
    return self["invocationId"]
var records : Iterator[KinesisFirehoseRecord]
Expand source code
@property
def records(self) -> Iterator[KinesisFirehoseRecord]:
    for record in self["records"]:
        yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
var region : str

AWS region where the event originated eg: us-east-1

Expand source code
@property
def region(self) -> str:
    """AWS region where the event originated eg: us-east-1"""
    return self["region"]
var source_kinesis_stream_arn : Optional[str]

ARN of the Kinesis Stream; present only when Kinesis Stream is source

Expand source code
@property
def source_kinesis_stream_arn(self) -> Optional[str]:
    """ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
    return self.get("sourceKinesisStreamArn")

Inherited members

class KinesisStreamEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Kinesis stream event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class KinesisStreamEvent(DictWrapper):
    """Kinesis stream event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
    """

    @property
    def records(self) -> Iterator[KinesisStreamRecord]:
        for record in self["Records"]:
            yield KinesisStreamRecord(record)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var records : Iterator[KinesisStreamRecord]
Expand source code
@property
def records(self) -> Iterator[KinesisStreamRecord]:
    for record in self["Records"]:
        yield KinesisStreamRecord(record)

Inherited members

class LambdaFunctionUrlEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

AWS Lambda Function URL event

Notes:

Lambda Function URL follows the API Gateway HTTP APIs Payload Format Version 2.0.

Keys related to API Gateway features not available in Function URL use a sentinel value (e.g.routeKey, stage).

Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html - https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html#urls-payloads

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class LambdaFunctionUrlEvent(APIGatewayProxyEventV2):
    """AWS Lambda Function URL event

    Notes:
    -----
    Lambda Function URL follows the API Gateway HTTP APIs Payload Format Version 2.0.

    Keys related to API Gateway features not available in Function URL use a sentinel value (e.g.`routeKey`, `stage`).

    Documentation:
    - https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html
    - https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html#urls-payloads
    """

    pass

Ancestors

Inherited members

class S3Event (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

S3 event notification

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class S3Event(DictWrapper):
    """S3 event notification

    Documentation:
    -------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
    - https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
    - https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
    """

    @property
    def records(self) -> Iterator[S3EventRecord]:
        for record in self["Records"]:
            yield S3EventRecord(record)

    @property
    def record(self) -> S3EventRecord:
        """Get the first s3 event record"""
        return next(self.records)

    @property
    def bucket_name(self) -> str:
        """Get the bucket name for the first s3 event record"""
        return self["Records"][0]["s3"]["bucket"]["name"]

    @property
    def object_key(self) -> str:
        """Get the object key for the first s3 event record and unquote plus"""
        return unquote_plus(self["Records"][0]["s3"]["object"]["key"])

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var bucket_name : str

Get the bucket name for the first s3 event record

Expand source code
@property
def bucket_name(self) -> str:
    """Get the bucket name for the first s3 event record"""
    return self["Records"][0]["s3"]["bucket"]["name"]
var object_key : str

Get the object key for the first s3 event record and unquote plus

Expand source code
@property
def object_key(self) -> str:
    """Get the object key for the first s3 event record and unquote plus"""
    return unquote_plus(self["Records"][0]["s3"]["object"]["key"])
var recordS3EventRecord

Get the first s3 event record

Expand source code
@property
def record(self) -> S3EventRecord:
    """Get the first s3 event record"""
    return next(self.records)
var records : Iterator[S3EventRecord]
Expand source code
@property
def records(self) -> Iterator[S3EventRecord]:
    for record in self["Records"]:
        yield S3EventRecord(record)

Inherited members

class S3EventBridgeNotificationEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Amazon S3EventBridge Event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class S3EventBridgeNotificationEvent(EventBridgeEvent):
    """Amazon S3EventBridge Event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html
    """

    @property
    def detail(self) -> S3EventBridgeNotificationDetail:  # type: ignore[override]
        """S3 notification details"""
        return S3EventBridgeNotificationDetail(self["detail"])

Ancestors

  • EventBridgeEvent
  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var detailS3EventBridgeNotificationDetail

S3 notification details

Expand source code
@property
def detail(self) -> S3EventBridgeNotificationDetail:  # type: ignore[override]
    """S3 notification details"""
    return S3EventBridgeNotificationDetail(self["detail"])

Inherited members

class SESEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Amazon SES to receive message event trigger

NOTE: There is a 30-second timeout on RequestResponse invocations.

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class SESEvent(DictWrapper):
    """Amazon SES to receive message event trigger

    NOTE: There is a 30-second timeout on RequestResponse invocations.

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/services-ses.html
    - https://docs.aws.amazon.com/ses/latest/DeveloperGuide/receiving-email-action-lambda.html
    """

    @property
    def records(self) -> Iterator[SESEventRecord]:
        for record in self["Records"]:
            yield SESEventRecord(record)

    @property
    def record(self) -> SESEventRecord:
        return next(self.records)

    @property
    def mail(self) -> SESMail:
        return self.record.ses.mail

    @property
    def receipt(self) -> SESReceipt:
        return self.record.ses.receipt

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var mailSESMail
Expand source code
@property
def mail(self) -> SESMail:
    return self.record.ses.mail
var receiptSESReceipt
Expand source code
@property
def receipt(self) -> SESReceipt:
    return self.record.ses.receipt
var recordSESEventRecord
Expand source code
@property
def record(self) -> SESEventRecord:
    return next(self.records)
var records : Iterator[SESEventRecord]
Expand source code
@property
def records(self) -> Iterator[SESEventRecord]:
    for record in self["Records"]:
        yield SESEventRecord(record)

Inherited members

class SNSEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

SNS Event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class SNSEvent(DictWrapper):
    """SNS Event

    Documentation:
    -------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html
    """

    @property
    def records(self) -> Iterator[SNSEventRecord]:
        for record in self["Records"]:
            yield SNSEventRecord(record)

    @property
    def record(self) -> SNSEventRecord:
        """Return the first SNS event record"""
        return next(self.records)

    @property
    def sns_message(self) -> str:
        """Return the message for the first sns event record"""
        return self.record.sns.message

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var recordSNSEventRecord

Return the first SNS event record

Expand source code
@property
def record(self) -> SNSEventRecord:
    """Return the first SNS event record"""
    return next(self.records)
var records : Iterator[SNSEventRecord]
Expand source code
@property
def records(self) -> Iterator[SNSEventRecord]:
    for record in self["Records"]:
        yield SNSEventRecord(record)
var sns_message : str

Return the message for the first sns event record

Expand source code
@property
def sns_message(self) -> str:
    """Return the message for the first sns event record"""
    return self.record.sns.message

Inherited members

class SQSEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

SQS Event

Documentation:

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class SQSEvent(DictWrapper):
    """SQS Event

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
    """

    @property
    def records(self) -> Iterator[SQSRecord]:
        for record in self["Records"]:
            yield SQSRecord(data=record, json_deserializer=self._json_deserializer)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var records : Iterator[SQSRecord]
Expand source code
@property
def records(self) -> Iterator[SQSRecord]:
    for record in self["Records"]:
        yield SQSRecord(data=record, json_deserializer=self._json_deserializer)

Inherited members

class VPCLatticeEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)

Provides a single read only access to a wrapper dict

Parameters

data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize str, bytes, bytearray containing a JSON document to a Python obj, by default json.loads
Expand source code
class VPCLatticeEvent(DictWrapper):
    @property
    def body(self) -> str:
        """The VPC Lattice body."""
        return self["body"]

    @property
    def json_body(self) -> Any:
        """Parses the submitted body as json"""
        if self._json_data is None:
            self._json_data = self._json_deserializer(self.decoded_body)
        return self._json_data

    @property
    def headers(self) -> Dict[str, str]:
        """The VPC Lattice event headers."""
        return self["headers"]

    @property
    def is_base64_encoded(self) -> bool:
        """A boolean flag to indicate if the applicable request payload is Base64-encode"""
        return self["is_base64_encoded"]

    @property
    def decoded_body(self) -> str:
        """Dynamically base64 decode body as a str"""
        body: str = self["body"]
        if self.is_base64_encoded:
            return base64.b64decode(body.encode()).decode()
        return body

    @property
    def method(self) -> str:
        """The VPC Lattice method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT."""
        return self["method"]

    @property
    def query_string_parameters(self) -> Dict[str, str]:
        """The request query string parameters."""
        return self["query_string_parameters"]

    @property
    def raw_path(self) -> str:
        """The raw VPC Lattice request path."""
        return self["raw_path"]

    def get_query_string_value(self, name: str, default_value: Optional[str] = None) -> Optional[str]:
        """Get query string value by name

        Parameters
        ----------
        name: str
            Query string parameter name
        default_value: str, optional
            Default value if no value was found by name
        Returns
        -------
        str, optional
            Query string parameter value
        """
        params = self.query_string_parameters
        return default_value if params is None else params.get(name, default_value)

    def get_header_value(
        self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False
    ) -> Optional[str]:
        """Get header value by name

        Parameters
        ----------
        name: str
            Header name
        default_value: str, optional
            Default value if no value was found by name
        case_sensitive: bool
            Whether to use a case-sensitive look up
        Returns
        -------
        str, optional
            Header value
        """
        return get_header_value(self.headers, name, default_value, case_sensitive)

Ancestors

  • DictWrapper
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Instance variables

var body : str

The VPC Lattice body.

Expand source code
@property
def body(self) -> str:
    """The VPC Lattice body."""
    return self["body"]
var decoded_body : str

Dynamically base64 decode body as a str

Expand source code
@property
def decoded_body(self) -> str:
    """Dynamically base64 decode body as a str"""
    body: str = self["body"]
    if self.is_base64_encoded:
        return base64.b64decode(body.encode()).decode()
    return body
var headers : Dict[str, str]

The VPC Lattice event headers.

Expand source code
@property
def headers(self) -> Dict[str, str]:
    """The VPC Lattice event headers."""
    return self["headers"]
var is_base64_encoded : bool

A boolean flag to indicate if the applicable request payload is Base64-encode

Expand source code
@property
def is_base64_encoded(self) -> bool:
    """A boolean flag to indicate if the applicable request payload is Base64-encode"""
    return self["is_base64_encoded"]
var json_body : Any

Parses the submitted body as json

Expand source code
@property
def json_body(self) -> Any:
    """Parses the submitted body as json"""
    if self._json_data is None:
        self._json_data = self._json_deserializer(self.decoded_body)
    return self._json_data
var method : str

The VPC Lattice method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT.

Expand source code
@property
def method(self) -> str:
    """The VPC Lattice method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT."""
    return self["method"]
var query_string_parameters : Dict[str, str]

The request query string parameters.

Expand source code
@property
def query_string_parameters(self) -> Dict[str, str]:
    """The request query string parameters."""
    return self["query_string_parameters"]
var raw_path : str

The raw VPC Lattice request path.

Expand source code
@property
def raw_path(self) -> str:
    """The raw VPC Lattice request path."""
    return self["raw_path"]

Methods

def get_header_value(self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False) ‑> Optional[str]

Get header value by name

Parameters

name : str
Header name
default_value : str, optional
Default value if no value was found by name
case_sensitive : bool
Whether to use a case-sensitive look up

Returns

str, optional
Header value
Expand source code
def get_header_value(
    self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False
) -> Optional[str]:
    """Get header value by name

    Parameters
    ----------
    name: str
        Header name
    default_value: str, optional
        Default value if no value was found by name
    case_sensitive: bool
        Whether to use a case-sensitive look up
    Returns
    -------
    str, optional
        Header value
    """
    return get_header_value(self.headers, name, default_value, case_sensitive)
def get_query_string_value(self, name: str, default_value: Optional[str] = None) ‑> Optional[str]

Get query string value by name

Parameters

name : str
Query string parameter name
default_value : str, optional
Default value if no value was found by name

Returns

str, optional
Query string parameter value
Expand source code
def get_query_string_value(self, name: str, default_value: Optional[str] = None) -> Optional[str]:
    """Get query string value by name

    Parameters
    ----------
    name: str
        Query string parameter name
    default_value: str, optional
        Default value if no value was found by name
    Returns
    -------
    str, optional
        Query string parameter value
    """
    params = self.query_string_parameters
    return default_value if params is None else params.get(name, default_value)

Inherited members