Module aws_lambda_powertools.utilities.data_classes
Event Source Data Classes utility provides classes self-describing Lambda event sources.
Expand source code
"""
Event Source Data Classes utility provides classes self-describing Lambda event sources.
"""
from .alb_event import ALBEvent
from .api_gateway_proxy_event import APIGatewayProxyEvent, APIGatewayProxyEventV2
from .appsync_resolver_event import AppSyncResolverEvent
from .aws_config_rule_event import AWSConfigRuleEvent
from .bedrock_agent_event import BedrockAgentEvent
from .cloud_watch_custom_widget_event import CloudWatchDashboardCustomWidgetEvent
from .cloud_watch_logs_event import CloudWatchLogsEvent
from .code_pipeline_job_event import CodePipelineJobEvent
from .connect_contact_flow_event import ConnectContactFlowEvent
from .dynamo_db_stream_event import DynamoDBStreamEvent
from .event_bridge_event import EventBridgeEvent
from .event_source import event_source
from .kafka_event import KafkaEvent
from .kinesis_firehose_event import (
KinesisFirehoseDataTransformationRecord,
KinesisFirehoseDataTransformationRecordMetadata,
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
)
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_event import S3Event, S3EventBridgeNotificationEvent
from .secrets_manager_event import SecretsManagerEvent
from .ses_event import SESEvent
from .sns_event import SNSEvent
from .sqs_event import SQSEvent
from .vpc_lattice import VPCLatticeEvent, VPCLatticeEventV2
__all__ = [
"APIGatewayProxyEvent",
"APIGatewayProxyEventV2",
"SecretsManagerEvent",
"AppSyncResolverEvent",
"ALBEvent",
"BedrockAgentEvent",
"CloudWatchDashboardCustomWidgetEvent",
"CloudWatchLogsEvent",
"CodePipelineJobEvent",
"ConnectContactFlowEvent",
"DynamoDBStreamEvent",
"EventBridgeEvent",
"KafkaEvent",
"KinesisFirehoseEvent",
"KinesisStreamEvent",
"KinesisFirehoseDataTransformationResponse",
"KinesisFirehoseDataTransformationRecord",
"KinesisFirehoseDataTransformationRecordMetadata",
"LambdaFunctionUrlEvent",
"S3Event",
"S3EventBridgeNotificationEvent",
"SESEvent",
"SNSEvent",
"SQSEvent",
"event_source",
"AWSConfigRuleEvent",
"VPCLatticeEvent",
"VPCLatticeEventV2",
]
Sub-modules
aws_lambda_powertools.utilities.data_classes.active_mq_event
aws_lambda_powertools.utilities.data_classes.alb_event
aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event
aws_lambda_powertools.utilities.data_classes.api_gateway_proxy_event
aws_lambda_powertools.utilities.data_classes.appsync
aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event
aws_lambda_powertools.utilities.data_classes.appsync_resolver_event
aws_lambda_powertools.utilities.data_classes.aws_config_rule_event
aws_lambda_powertools.utilities.data_classes.bedrock_agent_event
aws_lambda_powertools.utilities.data_classes.cloud_watch_custom_widget_event
aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event
aws_lambda_powertools.utilities.data_classes.code_pipeline_job_event
aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event
aws_lambda_powertools.utilities.data_classes.common
aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event
aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event
aws_lambda_powertools.utilities.data_classes.event_bridge_event
aws_lambda_powertools.utilities.data_classes.kafka_event
aws_lambda_powertools.utilities.data_classes.kinesis_firehose_event
aws_lambda_powertools.utilities.data_classes.kinesis_stream_event
aws_lambda_powertools.utilities.data_classes.lambda_function_url_event
aws_lambda_powertools.utilities.data_classes.rabbit_mq_event
aws_lambda_powertools.utilities.data_classes.s3_event
aws_lambda_powertools.utilities.data_classes.s3_object_event
aws_lambda_powertools.utilities.data_classes.secrets_manager_event
aws_lambda_powertools.utilities.data_classes.ses_event
aws_lambda_powertools.utilities.data_classes.shared_functions
aws_lambda_powertools.utilities.data_classes.sns_event
aws_lambda_powertools.utilities.data_classes.sqs_event
aws_lambda_powertools.utilities.data_classes.vpc_lattice
Functions
def event_source(handler: Callable[[Any, LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, data_class: Type[DictWrapper])
-
Middleware to create an instance of the passed in event source data class
Parameters
handler
:Callable
- Lambda's handler
event
:Dict
- Lambda's Event
context
:Dict
- Lambda's Context
data_class
:Type[DictWrapper]
- Data class type to instantiate
Example
Sample usage
from aws_lambda_powertools.utilities.data_classes import S3Event, event_source @event_source(data_class=S3Event) def handler(event: S3Event, context): return {"key": event.object_key}
Expand source code
@lambda_handler_decorator def event_source( handler: Callable[[Any, LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, data_class: Type[DictWrapper], ): """Middleware to create an instance of the passed in event source data class Parameters ---------- handler: Callable Lambda's handler event: Dict Lambda's Event context: Dict Lambda's Context data_class: Type[DictWrapper] Data class type to instantiate Example -------- **Sample usage** from aws_lambda_powertools.utilities.data_classes import S3Event, event_source @event_source(data_class=S3Event) def handler(event: S3Event, context): return {"key": event.object_key} """ return handler(data_class(event), context)
Classes
class ALBEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Application load balancer event
Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/services-alb.html
- https://docs.aws.amazon.com/elasticloadbalancing/latest/application/lambda-functions.html
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class ALBEvent(BaseProxyEvent): """Application load balancer event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-alb.html - https://docs.aws.amazon.com/elasticloadbalancing/latest/application/lambda-functions.html """ @property def request_context(self) -> ALBEventRequestContext: return ALBEventRequestContext(self._data) @property def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueQueryStringParameters") @property def multi_value_headers(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueHeaders") def header_serializer(self) -> BaseHeadersSerializer: # When using the ALB integration, the `multiValueHeaders` feature can be disabled (default) or enabled. # We can determine if the feature is enabled by looking if the event has a `multiValueHeaders` key. if self.multi_value_headers: return MultiValueHeadersSerializer() return SingleValueHeadersSerializer()
Ancestors
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var multi_value_headers : Optional[Dict[str, List[str]]]
-
Expand source code
@property def multi_value_headers(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueHeaders")
var multi_value_query_string_parameters : Optional[Dict[str, List[str]]]
-
Expand source code
@property def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueQueryStringParameters")
var request_context : ALBEventRequestContext
-
Expand source code
@property def request_context(self) -> ALBEventRequestContext: return ALBEventRequestContext(self._data)
Methods
def header_serializer(self) ‑> BaseHeadersSerializer
-
Expand source code
def header_serializer(self) -> BaseHeadersSerializer: # When using the ALB integration, the `multiValueHeaders` feature can be disabled (default) or enabled. # We can determine if the feature is enabled by looking if the event has a `multiValueHeaders` key. if self.multi_value_headers: return MultiValueHeadersSerializer() return SingleValueHeadersSerializer()
Inherited members
class APIGatewayProxyEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
AWS Lambda proxy V1
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class APIGatewayProxyEvent(BaseProxyEvent): """AWS Lambda proxy V1 Documentation: -------------- - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html """ @property def version(self) -> str: return self["version"] @property def resource(self) -> str: return self["resource"] @property def multi_value_headers(self) -> Dict[str, List[str]]: return self["multiValueHeaders"] @property def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueQueryStringParameters") @property def request_context(self) -> APIGatewayEventRequestContext: return APIGatewayEventRequestContext(self._data) @property def path_parameters(self) -> Optional[Dict[str, str]]: return self.get("pathParameters") @property def stage_variables(self) -> Optional[Dict[str, str]]: return self.get("stageVariables") def header_serializer(self) -> BaseHeadersSerializer: return MultiValueHeadersSerializer()
Ancestors
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var multi_value_headers : Dict[str, List[str]]
-
Expand source code
@property def multi_value_headers(self) -> Dict[str, List[str]]: return self["multiValueHeaders"]
var multi_value_query_string_parameters : Optional[Dict[str, List[str]]]
-
Expand source code
@property def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueQueryStringParameters")
var path_parameters : Optional[Dict[str, str]]
-
Expand source code
@property def path_parameters(self) -> Optional[Dict[str, str]]: return self.get("pathParameters")
var request_context : APIGatewayEventRequestContext
-
Expand source code
@property def request_context(self) -> APIGatewayEventRequestContext: return APIGatewayEventRequestContext(self._data)
var resource : str
-
Expand source code
@property def resource(self) -> str: return self["resource"]
var stage_variables : Optional[Dict[str, str]]
-
Expand source code
@property def stage_variables(self) -> Optional[Dict[str, str]]: return self.get("stageVariables")
var version : str
-
Expand source code
@property def version(self) -> str: return self["version"]
Methods
def header_serializer(self) ‑> BaseHeadersSerializer
-
Expand source code
def header_serializer(self) -> BaseHeadersSerializer: return MultiValueHeadersSerializer()
Inherited members
class APIGatewayProxyEventV2 (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
AWS Lambda proxy V2 event
Notes:
Format 2.0 doesn't have multiValueHeaders or multiValueQueryStringParameters fields. Duplicate headers are combined with commas and included in the headers field. Duplicate query strings are combined with commas and included in the queryStringParameters field.
Format 2.0 includes a new cookies field. All cookie headers in the request are combined with commas and added to the cookies field. In the response to the client, each cookie becomes a set-cookie header.
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class APIGatewayProxyEventV2(BaseProxyEvent): """AWS Lambda proxy V2 event Notes: ----- Format 2.0 doesn't have multiValueHeaders or multiValueQueryStringParameters fields. Duplicate headers are combined with commas and included in the headers field. Duplicate query strings are combined with commas and included in the queryStringParameters field. Format 2.0 includes a new cookies field. All cookie headers in the request are combined with commas and added to the cookies field. In the response to the client, each cookie becomes a set-cookie header. Documentation: -------------- - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html """ @property def version(self) -> str: return self["version"] @property def route_key(self) -> str: return self["routeKey"] @property def raw_path(self) -> str: return self["rawPath"] @property def raw_query_string(self) -> str: return self["rawQueryString"] @property def cookies(self) -> Optional[List[str]]: return self.get("cookies") @property def request_context(self) -> RequestContextV2: return RequestContextV2(self._data) @property def path_parameters(self) -> Optional[Dict[str, str]]: return self.get("pathParameters") @property def stage_variables(self) -> Optional[Dict[str, str]]: return self.get("stageVariables") @property def path(self) -> str: stage = self.request_context.stage if stage != "$default": return self.raw_path[len("/" + stage) :] return self.raw_path @property def http_method(self) -> str: """The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT.""" return self.request_context.http.method def header_serializer(self): return HttpApiHeadersSerializer()
Ancestors
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Subclasses
Instance variables
-
Expand source code
@property def cookies(self) -> Optional[List[str]]: return self.get("cookies")
var path : str
-
Expand source code
@property def path(self) -> str: stage = self.request_context.stage if stage != "$default": return self.raw_path[len("/" + stage) :] return self.raw_path
var path_parameters : Optional[Dict[str, str]]
-
Expand source code
@property def path_parameters(self) -> Optional[Dict[str, str]]: return self.get("pathParameters")
var raw_path : str
-
Expand source code
@property def raw_path(self) -> str: return self["rawPath"]
var raw_query_string : str
-
Expand source code
@property def raw_query_string(self) -> str: return self["rawQueryString"]
var request_context : RequestContextV2
-
Expand source code
@property def request_context(self) -> RequestContextV2: return RequestContextV2(self._data)
var route_key : str
-
Expand source code
@property def route_key(self) -> str: return self["routeKey"]
var stage_variables : Optional[Dict[str, str]]
-
Expand source code
@property def stage_variables(self) -> Optional[Dict[str, str]]: return self.get("stageVariables")
var version : str
-
Expand source code
@property def version(self) -> str: return self["version"]
Methods
def header_serializer(self)
-
Expand source code
def header_serializer(self): return HttpApiHeadersSerializer()
Inherited members
class AWSConfigRuleEvent (data: Dict[str, Any])
-
Events for AWS Config Rules Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class AWSConfigRuleEvent(DictWrapper): """Events for AWS Config Rules Documentation: -------------- - https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_lambda-functions.html """ def __init__(self, data: Dict[str, Any]): super().__init__(data) self._invoking_event: Optional[Any] = None self._rule_parameters: Optional[Any] = None @property def version(self) -> str: """The version of the event.""" return self["version"] @property def invoking_event( self, ) -> AWSConfigConfigurationChanged | AWSConfigScheduledNotification | AWSConfigOversizedConfiguration: """The invoking payload of the event.""" if self._invoking_event is None: self._invoking_event = self._json_deserializer(self["invokingEvent"]) return get_invoke_event(self._invoking_event) @property def raw_invoking_event(self) -> str: """The raw invoking payload of the event.""" return self["invokingEvent"] @property def rule_parameters(self) -> Dict: """The parameters of the event.""" if self._rule_parameters is None: self._rule_parameters = self._json_deserializer(self["ruleParameters"]) return self._rule_parameters @property def result_token(self) -> str: """The result token of the event.""" return self["resultToken"] @property def event_left_scope(self) -> bool: """The left scope of the event.""" return self["eventLeftScope"] @property def execution_role_arn(self) -> str: """The execution role arn of the event.""" return self["executionRoleArn"] @property def config_rule_arn(self) -> str: """The arn of the rule of the event.""" return self["configRuleArn"] @property def config_rule_name(self) -> str: """The name of the rule of the event.""" return self["configRuleName"] @property def config_rule_id(self) -> str: """The id of the rule of the event.""" return self["configRuleId"] @property def accountid(self) -> str: """The accountid of the event.""" return self["accountId"] @property def evalution_mode(self) -> Optional[str]: """The evalution mode of the event.""" return self.get("evaluationMode")
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var accountid : str
-
The accountid of the event.
Expand source code
@property def accountid(self) -> str: """The accountid of the event.""" return self["accountId"]
var config_rule_arn : str
-
The arn of the rule of the event.
Expand source code
@property def config_rule_arn(self) -> str: """The arn of the rule of the event.""" return self["configRuleArn"]
var config_rule_id : str
-
The id of the rule of the event.
Expand source code
@property def config_rule_id(self) -> str: """The id of the rule of the event.""" return self["configRuleId"]
var config_rule_name : str
-
The name of the rule of the event.
Expand source code
@property def config_rule_name(self) -> str: """The name of the rule of the event.""" return self["configRuleName"]
var evalution_mode : Optional[str]
-
The evalution mode of the event.
Expand source code
@property def evalution_mode(self) -> Optional[str]: """The evalution mode of the event.""" return self.get("evaluationMode")
var event_left_scope : bool
-
The left scope of the event.
Expand source code
@property def event_left_scope(self) -> bool: """The left scope of the event.""" return self["eventLeftScope"]
var execution_role_arn : str
-
The execution role arn of the event.
Expand source code
@property def execution_role_arn(self) -> str: """The execution role arn of the event.""" return self["executionRoleArn"]
var invoking_event : AWSConfigConfigurationChanged | AWSConfigScheduledNotification | AWSConfigOversizedConfiguration
-
The invoking payload of the event.
Expand source code
@property def invoking_event( self, ) -> AWSConfigConfigurationChanged | AWSConfigScheduledNotification | AWSConfigOversizedConfiguration: """The invoking payload of the event.""" if self._invoking_event is None: self._invoking_event = self._json_deserializer(self["invokingEvent"]) return get_invoke_event(self._invoking_event)
var raw_invoking_event : str
-
The raw invoking payload of the event.
Expand source code
@property def raw_invoking_event(self) -> str: """The raw invoking payload of the event.""" return self["invokingEvent"]
var result_token : str
-
The result token of the event.
Expand source code
@property def result_token(self) -> str: """The result token of the event.""" return self["resultToken"]
var rule_parameters : Dict
-
The parameters of the event.
Expand source code
@property def rule_parameters(self) -> Dict: """The parameters of the event.""" if self._rule_parameters is None: self._rule_parameters = self._json_deserializer(self["ruleParameters"]) return self._rule_parameters
var version : str
-
The version of the event.
Expand source code
@property def version(self) -> str: """The version of the event.""" return self["version"]
Inherited members
class AppSyncResolverEvent (data: dict)
-
AppSync resolver event
NOTE: AppSync Resolver Events can come in various shapes this data class supports both Amplify GraphQL directive @function and Direct Lambda Resolver
Documentation:
- https://docs.aws.amazon.com/appsync/latest/devguide/resolver-context-reference.html
- https://docs.amplify.aws/cli/graphql-transformer/function#structure-of-the-function-event
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class AppSyncResolverEvent(DictWrapper): """AppSync resolver event **NOTE:** AppSync Resolver Events can come in various shapes this data class supports both Amplify GraphQL directive @function and Direct Lambda Resolver Documentation: ------------- - https://docs.aws.amazon.com/appsync/latest/devguide/resolver-context-reference.html - https://docs.amplify.aws/cli/graphql-transformer/function#structure-of-the-function-event """ def __init__(self, data: dict): super().__init__(data) info: Optional[dict] = data.get("info") if not info: info = {"fieldName": self.get("fieldName"), "parentTypeName": self.get("typeName")} self._info = AppSyncResolverEventInfo(info) @property def type_name(self) -> str: """The name of the parent type for the field that is currently being resolved.""" return self.info.parent_type_name @property def field_name(self) -> str: """The name of the field that is currently being resolved.""" return self.info.field_name @property def arguments(self) -> Dict[str, Any]: """A map that contains all GraphQL arguments for this field.""" return self["arguments"] @property def identity(self) -> Union[None, AppSyncIdentityIAM, AppSyncIdentityCognito]: """An object that contains information about the caller. Depending on the type of identify found: - API_KEY authorization - returns None - AWS_IAM authorization - returns AppSyncIdentityIAM - AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito """ return get_identity_object(self.get("identity")) @property def source(self) -> Optional[Dict[str, Any]]: """A map that contains the resolution of the parent field.""" return self.get("source") @property def request_headers(self) -> Dict[str, str]: """Request headers""" return self["request"]["headers"] @property def prev_result(self) -> Optional[Dict[str, Any]]: """It represents the result of whatever previous operation was executed in a pipeline resolver.""" prev = self.get("prev") if not prev: return None return prev.get("result") @property def info(self) -> AppSyncResolverEventInfo: """The info section contains information about the GraphQL request.""" return self._info @property def stash(self) -> Optional[dict]: """The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver.""" return self.get("stash") def get_header_value( self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False, ) -> Optional[str]: """Get header value by name Parameters ---------- name: str Header name default_value: str, optional Default value if no value was found by name case_sensitive: bool Whether to use a case-sensitive look up Returns ------- str, optional Header value """ return get_header_value(self.request_headers, name, default_value, case_sensitive)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var arguments : Dict[str, Any]
-
A map that contains all GraphQL arguments for this field.
Expand source code
@property def arguments(self) -> Dict[str, Any]: """A map that contains all GraphQL arguments for this field.""" return self["arguments"]
var field_name : str
-
The name of the field that is currently being resolved.
Expand source code
@property def field_name(self) -> str: """The name of the field that is currently being resolved.""" return self.info.field_name
var identity : Union[ForwardRef(None), AppSyncIdentityIAM, AppSyncIdentityCognito]
-
An object that contains information about the caller.
Depending on the type of identify found:
- API_KEY authorization - returns None
- AWS_IAM authorization - returns AppSyncIdentityIAM
- AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito
Expand source code
@property def identity(self) -> Union[None, AppSyncIdentityIAM, AppSyncIdentityCognito]: """An object that contains information about the caller. Depending on the type of identify found: - API_KEY authorization - returns None - AWS_IAM authorization - returns AppSyncIdentityIAM - AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito """ return get_identity_object(self.get("identity"))
var info : AppSyncResolverEventInfo
-
The info section contains information about the GraphQL request.
Expand source code
@property def info(self) -> AppSyncResolverEventInfo: """The info section contains information about the GraphQL request.""" return self._info
var prev_result : Optional[Dict[str, Any]]
-
It represents the result of whatever previous operation was executed in a pipeline resolver.
Expand source code
@property def prev_result(self) -> Optional[Dict[str, Any]]: """It represents the result of whatever previous operation was executed in a pipeline resolver.""" prev = self.get("prev") if not prev: return None return prev.get("result")
var request_headers : Dict[str, str]
-
Request headers
Expand source code
@property def request_headers(self) -> Dict[str, str]: """Request headers""" return self["request"]["headers"]
var source : Optional[Dict[str, Any]]
-
A map that contains the resolution of the parent field.
Expand source code
@property def source(self) -> Optional[Dict[str, Any]]: """A map that contains the resolution of the parent field.""" return self.get("source")
var stash : Optional[dict]
-
The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver.
Expand source code
@property def stash(self) -> Optional[dict]: """The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver.""" return self.get("stash")
var type_name : str
-
The name of the parent type for the field that is currently being resolved.
Expand source code
@property def type_name(self) -> str: """The name of the parent type for the field that is currently being resolved.""" return self.info.parent_type_name
Methods
def get_header_value(self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False) ‑> Optional[str]
-
Get header value by name
Parameters
name
:str
- Header name
default_value
:str
, optional- Default value if no value was found by name
case_sensitive
:bool
- Whether to use a case-sensitive look up
Returns
str
, optional- Header value
Expand source code
def get_header_value( self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False, ) -> Optional[str]: """Get header value by name Parameters ---------- name: str Header name default_value: str, optional Default value if no value was found by name case_sensitive: bool Whether to use a case-sensitive look up Returns ------- str, optional Header value """ return get_header_value(self.request_headers, name, default_value, case_sensitive)
Inherited members
class BedrockAgentEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Bedrock Agent input event
See https://docs.aws.amazon.com/bedrock/latest/userguide/agents-create.html
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class BedrockAgentEvent(BaseProxyEvent): """ Bedrock Agent input event See https://docs.aws.amazon.com/bedrock/latest/userguide/agents-create.html """ @property def message_version(self) -> str: return self["messageVersion"] @property def input_text(self) -> str: return self["inputText"] @property def session_id(self) -> str: return self["sessionId"] @property def action_group(self) -> str: return self["actionGroup"] @property def api_path(self) -> str: return self["apiPath"] @property def http_method(self) -> str: return self["httpMethod"] @property def parameters(self) -> Optional[List[BedrockAgentProperty]]: return [BedrockAgentProperty(x) for x in self["parameters"]] if self.get("parameters") else None @property def request_body(self) -> Optional[BedrockAgentRequestBody]: return BedrockAgentRequestBody(self["requestBody"]) if self.get("requestBody") else None @property def agent(self) -> BedrockAgentInfo: return BedrockAgentInfo(self["agent"]) @property def session_attributes(self) -> Dict[str, str]: return self["sessionAttributes"] @property def prompt_session_attributes(self) -> Dict[str, str]: return self["promptSessionAttributes"] # The following methods add compatibility with BaseProxyEvent @property def path(self) -> str: return self["apiPath"] @property def query_string_parameters(self) -> Optional[Dict[str, str]]: # In Bedrock Agent events, query string parameters are passed as undifferentiated parameters, # together with the other parameters. So we just return all parameters here. return {x["name"]: x["value"] for x in self["parameters"]} if self.get("parameters") else None
Ancestors
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var action_group : str
-
Expand source code
@property def action_group(self) -> str: return self["actionGroup"]
var agent : BedrockAgentInfo
-
Expand source code
@property def agent(self) -> BedrockAgentInfo: return BedrockAgentInfo(self["agent"])
var api_path : str
-
Expand source code
@property def api_path(self) -> str: return self["apiPath"]
var input_text : str
-
Expand source code
@property def input_text(self) -> str: return self["inputText"]
var message_version : str
-
Expand source code
@property def message_version(self) -> str: return self["messageVersion"]
var parameters : Optional[List[BedrockAgentProperty]]
-
Expand source code
@property def parameters(self) -> Optional[List[BedrockAgentProperty]]: return [BedrockAgentProperty(x) for x in self["parameters"]] if self.get("parameters") else None
var path : str
-
Expand source code
@property def path(self) -> str: return self["apiPath"]
var prompt_session_attributes : Dict[str, str]
-
Expand source code
@property def prompt_session_attributes(self) -> Dict[str, str]: return self["promptSessionAttributes"]
var query_string_parameters : Optional[Dict[str, str]]
-
Expand source code
@property def query_string_parameters(self) -> Optional[Dict[str, str]]: # In Bedrock Agent events, query string parameters are passed as undifferentiated parameters, # together with the other parameters. So we just return all parameters here. return {x["name"]: x["value"] for x in self["parameters"]} if self.get("parameters") else None
var request_body : Optional[BedrockAgentRequestBody]
-
Expand source code
@property def request_body(self) -> Optional[BedrockAgentRequestBody]: return BedrockAgentRequestBody(self["requestBody"]) if self.get("requestBody") else None
var session_attributes : Dict[str, str]
-
Expand source code
@property def session_attributes(self) -> Dict[str, str]: return self["sessionAttributes"]
var session_id : str
-
Expand source code
@property def session_id(self) -> str: return self["sessionId"]
Inherited members
class CloudWatchDashboardCustomWidgetEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
CloudWatch dashboard custom widget event
You can use a Lambda function to create a custom widget on a CloudWatch dashboard.
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudWatchDashboardCustomWidgetEvent(DictWrapper): """CloudWatch dashboard custom widget event You can use a Lambda function to create a custom widget on a CloudWatch dashboard. Documentation: ------------- - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/add_custom_widget_dashboard_about.html """ @property def describe(self) -> bool: """Display widget documentation""" return bool(self.get("describe", False)) @property def widget_context(self) -> Optional[CloudWatchWidgetContext]: """The widget context""" if self.get("widgetContext"): return CloudWatchWidgetContext(self["widgetContext"]) return None
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var describe : bool
-
Display widget documentation
Expand source code
@property def describe(self) -> bool: """Display widget documentation""" return bool(self.get("describe", False))
var widget_context : Optional[CloudWatchWidgetContext]
-
The widget context
Expand source code
@property def widget_context(self) -> Optional[CloudWatchWidgetContext]: """The widget context""" if self.get("widgetContext"): return CloudWatchWidgetContext(self["widgetContext"]) return None
Inherited members
class CloudWatchLogsEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
CloudWatch Logs log stream event
You can use a Lambda function to monitor and analyze logs from an Amazon CloudWatch Logs log stream.
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudWatchLogsEvent(DictWrapper): """CloudWatch Logs log stream event You can use a Lambda function to monitor and analyze logs from an Amazon CloudWatch Logs log stream. Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html """ _decompressed_logs_data = None _json_logs_data = None @property def raw_logs_data(self) -> str: """The value of the `data` field is a Base64 encoded ZIP archive.""" return self["awslogs"]["data"] @property def decompress_logs_data(self) -> bytes: """Decode and decompress log data""" if self._decompressed_logs_data is None: payload = base64.b64decode(self.raw_logs_data) self._decompressed_logs_data = zlib.decompress(payload, zlib.MAX_WBITS | 32) return self._decompressed_logs_data def parse_logs_data(self) -> CloudWatchLogsDecodedData: """Decode, decompress and parse json data as CloudWatchLogsDecodedData""" if self._json_logs_data is None: self._json_logs_data = self._json_deserializer(self.decompress_logs_data.decode("UTF-8")) return CloudWatchLogsDecodedData(self._json_logs_data)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var decompress_logs_data : bytes
-
Decode and decompress log data
Expand source code
@property def decompress_logs_data(self) -> bytes: """Decode and decompress log data""" if self._decompressed_logs_data is None: payload = base64.b64decode(self.raw_logs_data) self._decompressed_logs_data = zlib.decompress(payload, zlib.MAX_WBITS | 32) return self._decompressed_logs_data
var raw_logs_data : str
-
The value of the
data
field is a Base64 encoded ZIP archive.Expand source code
@property def raw_logs_data(self) -> str: """The value of the `data` field is a Base64 encoded ZIP archive.""" return self["awslogs"]["data"]
Methods
def parse_logs_data(self) ‑> CloudWatchLogsDecodedData
-
Decode, decompress and parse json data as CloudWatchLogsDecodedData
Expand source code
def parse_logs_data(self) -> CloudWatchLogsDecodedData: """Decode, decompress and parse json data as CloudWatchLogsDecodedData""" if self._json_logs_data is None: self._json_logs_data = self._json_deserializer(self.decompress_logs_data.decode("UTF-8")) return CloudWatchLogsDecodedData(self._json_logs_data)
Inherited members
class CodePipelineJobEvent (data: Dict[str, Any])
-
AWS CodePipeline Job Event
Documentation:
- https://docs.aws.amazon.com/codepipeline/latest/userguide/actions-invoke-lambda-function.html
- https://docs.aws.amazon.com/lambda/latest/dg/services-codepipeline.html
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CodePipelineJobEvent(DictWrapper): """AWS CodePipeline Job Event Documentation: ------------- - https://docs.aws.amazon.com/codepipeline/latest/userguide/actions-invoke-lambda-function.html - https://docs.aws.amazon.com/lambda/latest/dg/services-codepipeline.html """ def __init__(self, data: Dict[str, Any]): super().__init__(data) self._job = self["CodePipeline.job"] @property def get_id(self) -> str: """Job id""" return self._job["id"] @property def account_id(self) -> str: """Account id""" return self._job["accountId"] @property def data(self) -> CodePipelineData: """Code pipeline jab data""" return CodePipelineData(self._job["data"]) @property def user_parameters(self) -> Optional[str]: """Action configuration user parameters""" return self.data.action_configuration.configuration.user_parameters @property def decoded_user_parameters(self) -> Optional[Dict[str, Any]]: """Json Decoded action configuration user parameters""" return self.data.action_configuration.configuration.decoded_user_parameters @property def input_bucket_name(self) -> str: """Get the first input artifact bucket name""" return self.data.input_artifacts[0].location.s3_location.bucket_name @property def input_object_key(self) -> str: """Get the first input artifact order key unquote plus""" return self.data.input_artifacts[0].location.s3_location.object_key def setup_s3_client(self): """Creates an S3 client Uses the credentials passed in the event by CodePipeline. These credentials can be used to access the artifact bucket. Returns ------- BaseClient An S3 client with the appropriate credentials """ # IMPORTING boto3 within the FUNCTION and not at the top level to get # it only when we explicitly want it for better performance. import boto3 from aws_lambda_powertools.shared import user_agent s3 = boto3.client( "s3", aws_access_key_id=self.data.artifact_credentials.access_key_id, aws_secret_access_key=self.data.artifact_credentials.secret_access_key, aws_session_token=self.data.artifact_credentials.session_token, ) user_agent.register_feature_to_client(client=s3, feature="data_classes") return s3 def find_input_artifact(self, artifact_name: str) -> Optional[CodePipelineArtifact]: """Find an input artifact by artifact name Parameters ---------- artifact_name : str The name of the input artifact to look for Returns ------- CodePipelineArtifact, None Matching CodePipelineArtifact if found """ for artifact in self.data.input_artifacts: if artifact.name == artifact_name: return artifact return None def get_artifact(self, artifact_name: str, filename: str) -> Optional[str]: """Get a file within an artifact zip on s3 Parameters ---------- artifact_name : str Name of the S3 artifact to download filename : str The file name within the artifact zip to extract as a string Returns ------- str, None Returns the contents file contents as a string """ artifact = self.find_input_artifact(artifact_name) if artifact is None: return None with tempfile.NamedTemporaryFile() as tmp_file: s3 = self.setup_s3_client() bucket = artifact.location.s3_location.bucket_name key = artifact.location.s3_location.key s3.download_file(bucket, key, tmp_file.name) with zipfile.ZipFile(tmp_file.name, "r") as zip_file: return zip_file.read(filename).decode("UTF-8")
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var account_id : str
-
Account id
Expand source code
@property def account_id(self) -> str: """Account id""" return self._job["accountId"]
var data : CodePipelineData
-
Code pipeline jab data
Expand source code
@property def data(self) -> CodePipelineData: """Code pipeline jab data""" return CodePipelineData(self._job["data"])
var decoded_user_parameters : Optional[Dict[str, Any]]
-
Json Decoded action configuration user parameters
Expand source code
@property def decoded_user_parameters(self) -> Optional[Dict[str, Any]]: """Json Decoded action configuration user parameters""" return self.data.action_configuration.configuration.decoded_user_parameters
var get_id : str
-
Job id
Expand source code
@property def get_id(self) -> str: """Job id""" return self._job["id"]
var input_bucket_name : str
-
Get the first input artifact bucket name
Expand source code
@property def input_bucket_name(self) -> str: """Get the first input artifact bucket name""" return self.data.input_artifacts[0].location.s3_location.bucket_name
var input_object_key : str
-
Get the first input artifact order key unquote plus
Expand source code
@property def input_object_key(self) -> str: """Get the first input artifact order key unquote plus""" return self.data.input_artifacts[0].location.s3_location.object_key
var user_parameters : Optional[str]
-
Action configuration user parameters
Expand source code
@property def user_parameters(self) -> Optional[str]: """Action configuration user parameters""" return self.data.action_configuration.configuration.user_parameters
Methods
def find_input_artifact(self, artifact_name: str) ‑> Optional[CodePipelineArtifact]
-
Find an input artifact by artifact name
Parameters
artifact_name
:str
- The name of the input artifact to look for
Returns
CodePipelineArtifact, None
- Matching CodePipelineArtifact if found
Expand source code
def find_input_artifact(self, artifact_name: str) -> Optional[CodePipelineArtifact]: """Find an input artifact by artifact name Parameters ---------- artifact_name : str The name of the input artifact to look for Returns ------- CodePipelineArtifact, None Matching CodePipelineArtifact if found """ for artifact in self.data.input_artifacts: if artifact.name == artifact_name: return artifact return None
def get_artifact(self, artifact_name: str, filename: str) ‑> Optional[str]
-
Get a file within an artifact zip on s3
Parameters
artifact_name
:str
- Name of the S3 artifact to download
filename
:str
- The file name within the artifact zip to extract as a string
Returns
str, None
- Returns the contents file contents as a string
Expand source code
def get_artifact(self, artifact_name: str, filename: str) -> Optional[str]: """Get a file within an artifact zip on s3 Parameters ---------- artifact_name : str Name of the S3 artifact to download filename : str The file name within the artifact zip to extract as a string Returns ------- str, None Returns the contents file contents as a string """ artifact = self.find_input_artifact(artifact_name) if artifact is None: return None with tempfile.NamedTemporaryFile() as tmp_file: s3 = self.setup_s3_client() bucket = artifact.location.s3_location.bucket_name key = artifact.location.s3_location.key s3.download_file(bucket, key, tmp_file.name) with zipfile.ZipFile(tmp_file.name, "r") as zip_file: return zip_file.read(filename).decode("UTF-8")
def setup_s3_client(self)
-
Creates an S3 client
Uses the credentials passed in the event by CodePipeline. These credentials can be used to access the artifact bucket.
Returns
BaseClient
- An S3 client with the appropriate credentials
Expand source code
def setup_s3_client(self): """Creates an S3 client Uses the credentials passed in the event by CodePipeline. These credentials can be used to access the artifact bucket. Returns ------- BaseClient An S3 client with the appropriate credentials """ # IMPORTING boto3 within the FUNCTION and not at the top level to get # it only when we explicitly want it for better performance. import boto3 from aws_lambda_powertools.shared import user_agent s3 = boto3.client( "s3", aws_access_key_id=self.data.artifact_credentials.access_key_id, aws_secret_access_key=self.data.artifact_credentials.secret_access_key, aws_session_token=self.data.artifact_credentials.session_token, ) user_agent.register_feature_to_client(client=s3, feature="data_classes") return s3
Inherited members
class ConnectContactFlowEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Amazon Connect contact flow event
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class ConnectContactFlowEvent(DictWrapper): """Amazon Connect contact flow event Documentation: ------------- - https://docs.aws.amazon.com/connect/latest/adminguide/connect-lambda-functions.html """ @property def contact_data(self) -> ConnectContactFlowData: """This is always passed by Amazon Connect for every contact. Some parameters are optional.""" return ConnectContactFlowData(self["Details"]["ContactData"]) @property def parameters(self) -> Dict[str, str]: """These are parameters specific to this call that were defined when you created the Lambda function.""" return self["Details"]["Parameters"]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var contact_data : ConnectContactFlowData
-
This is always passed by Amazon Connect for every contact. Some parameters are optional.
Expand source code
@property def contact_data(self) -> ConnectContactFlowData: """This is always passed by Amazon Connect for every contact. Some parameters are optional.""" return ConnectContactFlowData(self["Details"]["ContactData"])
var parameters : Dict[str, str]
-
These are parameters specific to this call that were defined when you created the Lambda function.
Expand source code
@property def parameters(self) -> Dict[str, str]: """These are parameters specific to this call that were defined when you created the Lambda function.""" return self["Details"]["Parameters"]
Inherited members
class DynamoDBStreamEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Dynamo DB Stream Event
Documentation:
Example
Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.
from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): for record in event.records: # {"N": "123.45"} => Decimal("123.45") key: str = record.dynamodb.keys["id"] print(key)
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class DynamoDBStreamEvent(DictWrapper): """Dynamo DB Stream Event Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html Example ------- **Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.** from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): for record in event.records: # {"N": "123.45"} => Decimal("123.45") key: str = record.dynamodb.keys["id"] print(key) """ @property def records(self) -> Iterator[DynamoDBRecord]: for record in self["Records"]: yield DynamoDBRecord(record)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var records : Iterator[DynamoDBRecord]
-
Expand source code
@property def records(self) -> Iterator[DynamoDBRecord]: for record in self["Records"]: yield DynamoDBRecord(record)
Inherited members
class EventBridgeEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Amazon EventBridge Event
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class EventBridgeEvent(DictWrapper): """Amazon EventBridge Event Documentation: -------------- - https://docs.aws.amazon.com/eventbridge/latest/userguide/aws-events.html """ @property def get_id(self) -> str: """A unique value is generated for every event. This can be helpful in tracing events as they move through rules to targets, and are processed.""" # Note: this name conflicts with existing python builtins return self["id"] @property def version(self) -> str: """By default, this is set to 0 (zero) in all events.""" return self["version"] @property def account(self) -> str: """The 12-digit number identifying an AWS account.""" return self["account"] @property def time(self) -> str: """The event timestamp, which can be specified by the service originating the event. If the event spans a time interval, the service might choose to report the start time, so this value can be noticeably before the time the event is actually received. """ return self["time"] @property def region(self) -> str: """Identifies the AWS region where the event originated.""" return self["region"] @property def resources(self) -> List[str]: """This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.""" return self["resources"] @property def source(self) -> str: """Identifies the service that sourced the event. All events sourced from within AWS begin with "aws." """ return self["source"] @property def detail_type(self) -> str: """Identifies, in combination with the source field, the fields and values that appear in the detail field.""" return self["detail-type"] @property def detail(self) -> Dict[str, Any]: """A JSON object, whose content is at the discretion of the service originating the event.""" return self["detail"] @property def replay_name(self) -> Optional[str]: """Identifies whether the event is being replayed and what is the name of the replay.""" return self["replay-name"]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Subclasses
Instance variables
var account : str
-
The 12-digit number identifying an AWS account.
Expand source code
@property def account(self) -> str: """The 12-digit number identifying an AWS account.""" return self["account"]
var detail : Dict[str, Any]
-
A JSON object, whose content is at the discretion of the service originating the event.
Expand source code
@property def detail(self) -> Dict[str, Any]: """A JSON object, whose content is at the discretion of the service originating the event.""" return self["detail"]
var detail_type : str
-
Identifies, in combination with the source field, the fields and values that appear in the detail field.
Expand source code
@property def detail_type(self) -> str: """Identifies, in combination with the source field, the fields and values that appear in the detail field.""" return self["detail-type"]
var get_id : str
-
A unique value is generated for every event. This can be helpful in tracing events as they move through rules to targets, and are processed.
Expand source code
@property def get_id(self) -> str: """A unique value is generated for every event. This can be helpful in tracing events as they move through rules to targets, and are processed.""" # Note: this name conflicts with existing python builtins return self["id"]
var region : str
-
Identifies the AWS region where the event originated.
Expand source code
@property def region(self) -> str: """Identifies the AWS region where the event originated.""" return self["region"]
var replay_name : Optional[str]
-
Identifies whether the event is being replayed and what is the name of the replay.
Expand source code
@property def replay_name(self) -> Optional[str]: """Identifies whether the event is being replayed and what is the name of the replay.""" return self["replay-name"]
var resources : List[str]
-
This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.
Expand source code
@property def resources(self) -> List[str]: """This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.""" return self["resources"]
var source : str
-
Identifies the service that sourced the event. All events sourced from within AWS begin with "aws."
Expand source code
@property def source(self) -> str: """Identifies the service that sourced the event. All events sourced from within AWS begin with "aws." """ return self["source"]
var time : str
-
The event timestamp, which can be specified by the service originating the event.
If the event spans a time interval, the service might choose to report the start time, so this value can be noticeably before the time the event is actually received.
Expand source code
@property def time(self) -> str: """The event timestamp, which can be specified by the service originating the event. If the event spans a time interval, the service might choose to report the start time, so this value can be noticeably before the time the event is actually received. """ return self["time"]
var version : str
-
By default, this is set to 0 (zero) in all events.
Expand source code
@property def version(self) -> str: """By default, this is set to 0 (zero) in all events.""" return self["version"]
Inherited members
class KafkaEvent (data: Dict[str, Any])
-
Self-managed or MSK Apache Kafka event trigger Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KafkaEvent(DictWrapper): """Self-managed or MSK Apache Kafka event trigger Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html """ def __init__(self, data: Dict[str, Any]): super().__init__(data) self._records: Optional[Iterator[KafkaEventRecord]] = None @property def event_source(self) -> str: """The AWS service from which the Kafka event record originated.""" return self["eventSource"] @property def event_source_arn(self) -> Optional[str]: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn") @property def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" return self["bootstrapServers"] @property def decoded_bootstrap_servers(self) -> List[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",") @property def records(self) -> Iterator[KafkaEventRecord]: """The Kafka records.""" for chunk in self["records"].values(): for record in chunk: yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer) @property def record(self) -> KafkaEventRecord: """ Returns the next Kafka record using an iterator. Returns ------- KafkaEventRecord The next Kafka record. Raises ------ StopIteration If there are no more records available. """ if self._records is None: self._records = self.records return next(self._records)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var bootstrap_servers : str
-
The Kafka bootstrap URL.
Expand source code
@property def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" return self["bootstrapServers"]
var decoded_bootstrap_servers : List[str]
-
The decoded Kafka bootstrap URL.
Expand source code
@property def decoded_bootstrap_servers(self) -> List[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",")
var event_source : str
-
The AWS service from which the Kafka event record originated.
Expand source code
@property def event_source(self) -> str: """The AWS service from which the Kafka event record originated.""" return self["eventSource"]
var event_source_arn : Optional[str]
-
The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.
Expand source code
@property def event_source_arn(self) -> Optional[str]: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn")
var record : KafkaEventRecord
-
Returns the next Kafka record using an iterator.
Returns
KafkaEventRecord
- The next Kafka record.
Raises
StopIteration
- If there are no more records available.
Expand source code
@property def record(self) -> KafkaEventRecord: """ Returns the next Kafka record using an iterator. Returns ------- KafkaEventRecord The next Kafka record. Raises ------ StopIteration If there are no more records available. """ if self._records is None: self._records = self.records return next(self._records)
var records : Iterator[KafkaEventRecord]
-
The Kafka records.
Expand source code
@property def records(self) -> Iterator[KafkaEventRecord]: """The Kafka records.""" for chunk in self["records"].values(): for record in chunk: yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)
Inherited members
class KinesisFirehoseDataTransformationRecord (record_id: str, result: Literal['Ok', 'Dropped', 'ProcessingFailed'] = 'Ok', data: str = '', metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None, json_serializer: Callable = <function dumps>, json_deserializer: Callable = <function loads>)
-
Record in Kinesis Data Firehose response object.
Parameters
record_id
:str
- uniquely identifies this record within the current batch
result
:Literal["Ok", "Dropped", "ProcessingFailed"]
- record data transformation status, whether it succeeded, should be dropped, or failed.
data
:str
-
base64-encoded payload, by default empty string.
Use
data_from_text
ordata_from_json
methods to convert data if needed. metadata
:Optional[KinesisFirehoseDataTransformationRecordMetadata]
-
Metadata associated with this record; can contain partition keys.
See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
json_serializer
:Callable
- function to serialize
obj
to a JSON formattedstr
, by default json.dumps json_deserializer
:Callable
- function to deserialize
str
,bytes
, bytearraycontaining a JSON document to a Python
obj`, by default json.loads
Documentation:
Expand source code
@dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationRecord: """Record in Kinesis Data Firehose response object. Parameters ---------- record_id: str uniquely identifies this record within the current batch result: Literal["Ok", "Dropped", "ProcessingFailed"] record data transformation status, whether it succeeded, should be dropped, or failed. data: str base64-encoded payload, by default empty string. Use `data_from_text` or `data_from_json` methods to convert data if needed. metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] Metadata associated with this record; can contain partition keys. See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html json_serializer: Callable function to serialize `obj` to a JSON formatted `str`, by default json.dumps json_deserializer: Callable function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`, by default json.loads Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html """ _valid_result_types: ClassVar[Tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed") record_id: str result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok" data: str = "" metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None json_serializer: Callable = json.dumps json_deserializer: Callable = json.loads _json_data: Optional[Any] = None def asdict(self) -> Dict: if self.result not in self._valid_result_types: warnings.warn( stacklevel=1, message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"', ) record: Dict[str, Any] = { "recordId": self.record_id, "result": self.result, "data": self.data, } if self.metadata: record["metadata"] = self.metadata.asdict() return record @property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" if not self.data: return b"" return base64.b64decode(self.data) @property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" if not self.data: return "" return self.data_as_bytes.decode("utf-8") @property def data_as_json(self) -> Dict: """Decoded base64-encoded data loaded to json""" if not self.data: return {} if self._json_data is None: self._json_data = self.json_deserializer(self.data_as_text) return self._json_data
Class variables
var data : str
var metadata : Optional[KinesisFirehoseDataTransformationRecordMetadata]
var record_id : str
var result : Literal['Ok', 'Dropped', 'ProcessingFailed']
Instance variables
var data_as_bytes : bytes
-
Decoded base64-encoded data as bytes
Expand source code
@property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" if not self.data: return b"" return base64.b64decode(self.data)
var data_as_json : Dict
-
Decoded base64-encoded data loaded to json
Expand source code
@property def data_as_json(self) -> Dict: """Decoded base64-encoded data loaded to json""" if not self.data: return {} if self._json_data is None: self._json_data = self.json_deserializer(self.data_as_text) return self._json_data
var data_as_text : str
-
Decoded base64-encoded data as text
Expand source code
@property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" if not self.data: return "" return self.data_as_bytes.decode("utf-8")
Methods
def asdict(self) ‑> Dict
-
Expand source code
def asdict(self) -> Dict: if self.result not in self._valid_result_types: warnings.warn( stacklevel=1, message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"', ) record: Dict[str, Any] = { "recordId": self.record_id, "result": self.result, "data": self.data, } if self.metadata: record["metadata"] = self.metadata.asdict() return record
def json_deserializer(s, *, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw) ‑> Callable
-
Deserialize
s
(astr
,bytes
orbytearray
instance containing a JSON document) to a Python object.object_hook
is an optional function that will be called with the result of any object literal decode (adict
). The return value ofobject_hook
will be used instead of thedict
. This feature can be used to implement custom decoders (e.g. JSON-RPC class hinting).object_pairs_hook
is an optional function that will be called with the result of any object literal decoded with an ordered list of pairs. The return value ofobject_pairs_hook
will be used instead of thedict
. This feature can be used to implement custom decoders. Ifobject_hook
is also defined, theobject_pairs_hook
takes priority.parse_float
, if specified, will be called with the string of every JSON float to be decoded. By default this is equivalent to float(num_str). This can be used to use another datatype or parser for JSON floats (e.g. decimal.Decimal).parse_int
, if specified, will be called with the string of every JSON int to be decoded. By default this is equivalent to int(num_str). This can be used to use another datatype or parser for JSON integers (e.g. float).parse_constant
, if specified, will be called with one of the following strings: -Infinity, Infinity, NaN. This can be used to raise an exception if invalid JSON numbers are encountered.To use a custom
JSONDecoder
subclass, specify it with thecls
kwarg; otherwiseJSONDecoder
is used.Expand source code
def loads(s, *, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw): """Deserialize ``s`` (a ``str``, ``bytes`` or ``bytearray`` instance containing a JSON document) to a Python object. ``object_hook`` is an optional function that will be called with the result of any object literal decode (a ``dict``). The return value of ``object_hook`` will be used instead of the ``dict``. This feature can be used to implement custom decoders (e.g. JSON-RPC class hinting). ``object_pairs_hook`` is an optional function that will be called with the result of any object literal decoded with an ordered list of pairs. The return value of ``object_pairs_hook`` will be used instead of the ``dict``. This feature can be used to implement custom decoders. If ``object_hook`` is also defined, the ``object_pairs_hook`` takes priority. ``parse_float``, if specified, will be called with the string of every JSON float to be decoded. By default this is equivalent to float(num_str). This can be used to use another datatype or parser for JSON floats (e.g. decimal.Decimal). ``parse_int``, if specified, will be called with the string of every JSON int to be decoded. By default this is equivalent to int(num_str). This can be used to use another datatype or parser for JSON integers (e.g. float). ``parse_constant``, if specified, will be called with one of the following strings: -Infinity, Infinity, NaN. This can be used to raise an exception if invalid JSON numbers are encountered. To use a custom ``JSONDecoder`` subclass, specify it with the ``cls`` kwarg; otherwise ``JSONDecoder`` is used. """ if isinstance(s, str): if s.startswith('\ufeff'): raise JSONDecodeError("Unexpected UTF-8 BOM (decode using utf-8-sig)", s, 0) else: if not isinstance(s, (bytes, bytearray)): raise TypeError(f'the JSON object must be str, bytes or bytearray, ' f'not {s.__class__.__name__}') s = s.decode(detect_encoding(s), 'surrogatepass') if (cls is None and object_hook is None and parse_int is None and parse_float is None and parse_constant is None and object_pairs_hook is None and not kw): return _default_decoder.decode(s) if cls is None: cls = JSONDecoder if object_hook is not None: kw['object_hook'] = object_hook if object_pairs_hook is not None: kw['object_pairs_hook'] = object_pairs_hook if parse_float is not None: kw['parse_float'] = parse_float if parse_int is not None: kw['parse_int'] = parse_int if parse_constant is not None: kw['parse_constant'] = parse_constant return cls(**kw).decode(s)
def json_serializer(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=None, sort_keys=False, **kw) ‑> Callable
-
Serialize
obj
to a JSON formattedstr
.If
skipkeys
is true thendict
keys that are not basic types (str
,int
,float
,bool
,None
) will be skipped instead of raising aTypeError
.If
ensure_ascii
is false, then the return value can contain non-ASCII characters if they appear in strings contained inobj
. Otherwise, all such characters are escaped in JSON strings.If
check_circular
is false, then the circular reference check for container types will be skipped and a circular reference will result in anRecursionError
(or worse).If
allow_nan
is false, then it will be aValueError
to serialize out of rangefloat
values (nan
,inf
,-inf
) in strict compliance of the JSON specification, instead of using the JavaScript equivalents (NaN
,Infinity
,-Infinity
).If
indent
is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines.None
is the most compact representation.If specified,
separators
should be an(item_separator, key_separator)
tuple. The default is(', ', ': ')
if indent isNone
and(',', ': ')
otherwise. To get the most compact JSON representation, you should specify(',', ':')
to eliminate whitespace.default(obj)
is a function that should return a serializable version of obj or raise TypeError. The default simply raises TypeError.If sort_keys is true (default:
False
), then the output of dictionaries will be sorted by key.To use a custom
JSONEncoder
subclass (e.g. one that overrides the.default()
method to serialize additional types), specify it with thecls
kwarg; otherwiseJSONEncoder
is used.Expand source code
def dumps(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=None, sort_keys=False, **kw): """Serialize ``obj`` to a JSON formatted ``str``. If ``skipkeys`` is true then ``dict`` keys that are not basic types (``str``, ``int``, ``float``, ``bool``, ``None``) will be skipped instead of raising a ``TypeError``. If ``ensure_ascii`` is false, then the return value can contain non-ASCII characters if they appear in strings contained in ``obj``. Otherwise, all such characters are escaped in JSON strings. If ``check_circular`` is false, then the circular reference check for container types will be skipped and a circular reference will result in an ``RecursionError`` (or worse). If ``allow_nan`` is false, then it will be a ``ValueError`` to serialize out of range ``float`` values (``nan``, ``inf``, ``-inf``) in strict compliance of the JSON specification, instead of using the JavaScript equivalents (``NaN``, ``Infinity``, ``-Infinity``). If ``indent`` is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact representation. If specified, ``separators`` should be an ``(item_separator, key_separator)`` tuple. The default is ``(', ', ': ')`` if *indent* is ``None`` and ``(',', ': ')`` otherwise. To get the most compact JSON representation, you should specify ``(',', ':')`` to eliminate whitespace. ``default(obj)`` is a function that should return a serializable version of obj or raise TypeError. The default simply raises TypeError. If *sort_keys* is true (default: ``False``), then the output of dictionaries will be sorted by key. To use a custom ``JSONEncoder`` subclass (e.g. one that overrides the ``.default()`` method to serialize additional types), specify it with the ``cls`` kwarg; otherwise ``JSONEncoder`` is used. """ # cached encoder if (not skipkeys and ensure_ascii and check_circular and allow_nan and cls is None and indent is None and separators is None and default is None and not sort_keys and not kw): return _default_encoder.encode(obj) if cls is None: cls = JSONEncoder return cls( skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw).encode(obj)
class KinesisFirehoseDataTransformationRecordMetadata (partition_keys: Dict[str, str] = <factory>)
-
Metadata in Firehose Data Transform Record.
Parameters
partition_keys
:Dict[str, str]
- A dict of partition keys/value in string format, e.g.
{"year":"2023","month":"09"}
Documentation:
Expand source code
@dataclass(repr=False, order=False, frozen=True) class KinesisFirehoseDataTransformationRecordMetadata: """ Metadata in Firehose Data Transform Record. Parameters ---------- partition_keys: Dict[str, str] A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}` Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ partition_keys: Dict[str, str] = field(default_factory=lambda: {}) def asdict(self) -> Dict: if self.partition_keys is not None: return {"partitionKeys": self.partition_keys} return {}
Class variables
var partition_keys : Dict[str, str]
Methods
def asdict(self) ‑> Dict
-
Expand source code
def asdict(self) -> Dict: if self.partition_keys is not None: return {"partitionKeys": self.partition_keys} return {}
class KinesisFirehoseDataTransformationResponse (records: List[KinesisFirehoseDataTransformationRecord] = <factory>)
-
Kinesis Data Firehose response object
Documentation:
Parameters
records
:List[KinesisFirehoseResponseRecord]
- records of Kinesis Data Firehose response object,
optional parameter at start. can be added later using
add_record
function.
Examples
Transforming data records
from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() for record in firehose_event.records: payload = record.data_as_text # base64 decoded data as str ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) result.add_record(processed_record) # return transformed records return result.asdict()
Expand source code
@dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationResponse: """Kinesis Data Firehose response object Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html Parameters ---------- records : List[KinesisFirehoseResponseRecord] records of Kinesis Data Firehose response object, optional parameter at start. can be added later using `add_record` function. Examples -------- **Transforming data records** ```python from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() for record in firehose_event.records: payload = record.data_as_text # base64 decoded data as str ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) result.add_record(processed_record) # return transformed records return result.asdict() ``` """ records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list) def add_record(self, record: KinesisFirehoseDataTransformationRecord): self.records.append(record) def asdict(self) -> Dict: if not self.records: raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response") return {"records": [record.asdict() for record in self.records]}
Class variables
var records : List[KinesisFirehoseDataTransformationRecord]
Methods
def add_record(self, record: KinesisFirehoseDataTransformationRecord)
-
Expand source code
def add_record(self, record: KinesisFirehoseDataTransformationRecord): self.records.append(record)
def asdict(self) ‑> Dict
-
Expand source code
def asdict(self) -> Dict: if not self.records: raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response") return {"records": [record.asdict() for record in self.records]}
class KinesisFirehoseEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Kinesis Data Firehose event
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisFirehoseEvent(DictWrapper): """Kinesis Data Firehose event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html """ @property def invocation_id(self) -> str: """Unique ID for for Lambda invocation""" return self["invocationId"] @property def delivery_stream_arn(self) -> str: """ARN of the Firehose Data Firehose Delivery Stream""" return self["deliveryStreamArn"] @property def source_kinesis_stream_arn(self) -> Optional[str]: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn") @property def region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["region"] @property def records(self) -> Iterator[KinesisFirehoseRecord]: for record in self["records"]: yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var delivery_stream_arn : str
-
ARN of the Firehose Data Firehose Delivery Stream
Expand source code
@property def delivery_stream_arn(self) -> str: """ARN of the Firehose Data Firehose Delivery Stream""" return self["deliveryStreamArn"]
var invocation_id : str
-
Unique ID for for Lambda invocation
Expand source code
@property def invocation_id(self) -> str: """Unique ID for for Lambda invocation""" return self["invocationId"]
var records : Iterator[KinesisFirehoseRecord]
-
Expand source code
@property def records(self) -> Iterator[KinesisFirehoseRecord]: for record in self["records"]: yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
var region : str
-
AWS region where the event originated eg: us-east-1
Expand source code
@property def region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["region"]
var source_kinesis_stream_arn : Optional[str]
-
ARN of the Kinesis Stream; present only when Kinesis Stream is source
Expand source code
@property def source_kinesis_stream_arn(self) -> Optional[str]: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn")
Inherited members
class KinesisStreamEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Kinesis stream event
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisStreamEvent(DictWrapper): """Kinesis stream event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html """ @property def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var records : Iterator[KinesisStreamRecord]
-
Expand source code
@property def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record)
Inherited members
class LambdaFunctionUrlEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
AWS Lambda Function URL event
Notes:
Lambda Function URL follows the API Gateway HTTP APIs Payload Format Version 2.0.
Keys related to API Gateway features not available in Function URL use a sentinel value (e.g.
routeKey
,stage
).Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html - https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html#urls-payloads
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class LambdaFunctionUrlEvent(APIGatewayProxyEventV2): """AWS Lambda Function URL event Notes: ----- Lambda Function URL follows the API Gateway HTTP APIs Payload Format Version 2.0. Keys related to API Gateway features not available in Function URL use a sentinel value (e.g.`routeKey`, `stage`). Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html - https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html#urls-payloads """ pass
Ancestors
- APIGatewayProxyEventV2
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Inherited members
class S3Event (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
S3 event notification
Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
- https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
- https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class S3Event(DictWrapper): """S3 event notification Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html - https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html - https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html """ @property def records(self) -> Iterator[S3EventRecord]: for record in self["Records"]: yield S3EventRecord(record) @property def record(self) -> S3EventRecord: """Get the first s3 event record""" return next(self.records) @property def bucket_name(self) -> str: """Get the bucket name for the first s3 event record""" return self["Records"][0]["s3"]["bucket"]["name"] @property def object_key(self) -> str: """Get the object key for the first s3 event record and unquote plus""" return unquote_plus(self["Records"][0]["s3"]["object"]["key"])
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var bucket_name : str
-
Get the bucket name for the first s3 event record
Expand source code
@property def bucket_name(self) -> str: """Get the bucket name for the first s3 event record""" return self["Records"][0]["s3"]["bucket"]["name"]
var object_key : str
-
Get the object key for the first s3 event record and unquote plus
Expand source code
@property def object_key(self) -> str: """Get the object key for the first s3 event record and unquote plus""" return unquote_plus(self["Records"][0]["s3"]["object"]["key"])
var record : S3EventRecord
-
Get the first s3 event record
Expand source code
@property def record(self) -> S3EventRecord: """Get the first s3 event record""" return next(self.records)
var records : Iterator[S3EventRecord]
-
Expand source code
@property def records(self) -> Iterator[S3EventRecord]: for record in self["Records"]: yield S3EventRecord(record)
Inherited members
class S3EventBridgeNotificationEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Amazon S3EventBridge Event
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class S3EventBridgeNotificationEvent(EventBridgeEvent): """Amazon S3EventBridge Event Documentation: -------------- - https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html """ @property def detail(self) -> S3EventBridgeNotificationDetail: # type: ignore[override] """S3 notification details""" return S3EventBridgeNotificationDetail(self["detail"])
Ancestors
- EventBridgeEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var detail : S3EventBridgeNotificationDetail
-
S3 notification details
Expand source code
@property def detail(self) -> S3EventBridgeNotificationDetail: # type: ignore[override] """S3 notification details""" return S3EventBridgeNotificationDetail(self["detail"])
Inherited members
class SESEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Amazon SES to receive message event trigger
NOTE: There is a 30-second timeout on RequestResponse invocations.
Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/services-ses.html
- https://docs.aws.amazon.com/ses/latest/DeveloperGuide/receiving-email-action-lambda.html
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class SESEvent(DictWrapper): """Amazon SES to receive message event trigger NOTE: There is a 30-second timeout on RequestResponse invocations. Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-ses.html - https://docs.aws.amazon.com/ses/latest/DeveloperGuide/receiving-email-action-lambda.html """ @property def records(self) -> Iterator[SESEventRecord]: for record in self["Records"]: yield SESEventRecord(record) @property def record(self) -> SESEventRecord: return next(self.records) @property def mail(self) -> SESMail: return self.record.ses.mail @property def receipt(self) -> SESReceipt: return self.record.ses.receipt
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var mail : SESMail
-
Expand source code
@property def mail(self) -> SESMail: return self.record.ses.mail
var receipt : SESReceipt
-
Expand source code
@property def receipt(self) -> SESReceipt: return self.record.ses.receipt
var record : SESEventRecord
-
Expand source code
@property def record(self) -> SESEventRecord: return next(self.records)
var records : Iterator[SESEventRecord]
-
Expand source code
@property def records(self) -> Iterator[SESEventRecord]: for record in self["Records"]: yield SESEventRecord(record)
Inherited members
class SNSEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
SNS Event
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class SNSEvent(DictWrapper): """SNS Event Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html """ @property def records(self) -> Iterator[SNSEventRecord]: for record in self["Records"]: yield SNSEventRecord(record) @property def record(self) -> SNSEventRecord: """Return the first SNS event record""" return next(self.records) @property def sns_message(self) -> str: """Return the message for the first sns event record""" return self.record.sns.message
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var record : SNSEventRecord
-
Return the first SNS event record
Expand source code
@property def record(self) -> SNSEventRecord: """Return the first SNS event record""" return next(self.records)
var records : Iterator[SNSEventRecord]
-
Expand source code
@property def records(self) -> Iterator[SNSEventRecord]: for record in self["Records"]: yield SNSEventRecord(record)
var sns_message : str
-
Return the message for the first sns event record
Expand source code
@property def sns_message(self) -> str: """Return the message for the first sns event record""" return self.record.sns.message
Inherited members
class SQSEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
SQS Event
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class SQSEvent(DictWrapper): """SQS Event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html """ @property def records(self) -> Iterator[SQSRecord]: for record in self["Records"]: yield SQSRecord(data=record, json_deserializer=self._json_deserializer)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var records : Iterator[SQSRecord]
-
Expand source code
@property def records(self) -> Iterator[SQSRecord]: for record in self["Records"]: yield SQSRecord(data=record, json_deserializer=self._json_deserializer)
Inherited members
class SecretsManagerEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class SecretsManagerEvent(DictWrapper): @property def secret_id(self) -> str: """SecretId: The secret ARN or identifier""" return self["SecretId"] @property def client_request_token(self) -> str: """ClientRequestToken: The ClientRequestToken associated with the secret version""" return self["ClientRequestToken"] @property def version_id(self) -> str: """Alias to ClientRequestToken to get token associated to version""" return self["ClientRequestToken"] @property def step(self) -> Literal["createSecret", "setSecret", "testSecret", "finishSecret"]: """Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)""" return self["Step"]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var client_request_token : str
-
ClientRequestToken: The ClientRequestToken associated with the secret version
Expand source code
@property def client_request_token(self) -> str: """ClientRequestToken: The ClientRequestToken associated with the secret version""" return self["ClientRequestToken"]
var secret_id : str
-
SecretId: The secret ARN or identifier
Expand source code
@property def secret_id(self) -> str: """SecretId: The secret ARN or identifier""" return self["SecretId"]
var step : Literal['createSecret', 'setSecret', 'testSecret', 'finishSecret']
-
Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)
Expand source code
@property def step(self) -> Literal["createSecret", "setSecret", "testSecret", "finishSecret"]: """Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)""" return self["Step"]
var version_id : str
-
Alias to ClientRequestToken to get token associated to version
Expand source code
@property def version_id(self) -> str: """Alias to ClientRequestToken to get token associated to version""" return self["ClientRequestToken"]
Inherited members
class VPCLatticeEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class VPCLatticeEvent(VPCLatticeEventBase): @property def raw_path(self) -> str: """The raw VPC Lattice request path.""" return self["raw_path"] @property def is_base64_encoded(self) -> bool: """A boolean flag to indicate if the applicable request payload is Base64-encode""" return self["is_base64_encoded"] # VPCLattice event has no path field # Added here for consistency with the BaseProxyEvent class @property def path(self) -> str: return self["raw_path"] @property def query_string_parameters(self) -> Dict[str, str]: """The request query string parameters.""" return self["query_string_parameters"]
Ancestors
- VPCLatticeEventBase
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var is_base64_encoded : bool
-
A boolean flag to indicate if the applicable request payload is Base64-encode
Expand source code
@property def is_base64_encoded(self) -> bool: """A boolean flag to indicate if the applicable request payload is Base64-encode""" return self["is_base64_encoded"]
var path : str
-
Expand source code
@property def path(self) -> str: return self["raw_path"]
var query_string_parameters : Dict[str, str]
-
The request query string parameters.
Expand source code
@property def query_string_parameters(self) -> Dict[str, str]: """The request query string parameters.""" return self["query_string_parameters"]
var raw_path : str
-
The raw VPC Lattice request path.
Expand source code
@property def raw_path(self) -> str: """The raw VPC Lattice request path.""" return self["raw_path"]
Inherited members
class VPCLatticeEventV2 (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class VPCLatticeEventV2(VPCLatticeEventBase): @property def version(self) -> str: """The VPC Lattice v2 Event version""" return self["version"] @property def is_base64_encoded(self) -> Optional[bool]: """A boolean flag to indicate if the applicable request payload is Base64-encode""" return self.get("isBase64Encoded") @property def path(self) -> str: """The VPC Lattice v2 Event path""" return self["path"] @property def request_context(self) -> vpcLatticeEventV2RequestContext: """he VPC Lattice v2 Event request context.""" return vpcLatticeEventV2RequestContext(self["requestContext"]) @property def query_string_parameters(self) -> Optional[Dict[str, str]]: """The request query string parameters.""" return self.get("queryStringParameters")
Ancestors
- VPCLatticeEventBase
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
var is_base64_encoded : Optional[bool]
-
A boolean flag to indicate if the applicable request payload is Base64-encode
Expand source code
@property def is_base64_encoded(self) -> Optional[bool]: """A boolean flag to indicate if the applicable request payload is Base64-encode""" return self.get("isBase64Encoded")
var path : str
-
The VPC Lattice v2 Event path
Expand source code
@property def path(self) -> str: """The VPC Lattice v2 Event path""" return self["path"]
var query_string_parameters : Optional[Dict[str, str]]
-
The request query string parameters.
Expand source code
@property def query_string_parameters(self) -> Optional[Dict[str, str]]: """The request query string parameters.""" return self.get("queryStringParameters")
var request_context : vpcLatticeEventV2RequestContext
-
he VPC Lattice v2 Event request context.
Expand source code
@property def request_context(self) -> vpcLatticeEventV2RequestContext: """he VPC Lattice v2 Event request context.""" return vpcLatticeEventV2RequestContext(self["requestContext"])
var version : str
-
The VPC Lattice v2 Event version
Expand source code
@property def version(self) -> str: """The VPC Lattice v2 Event version""" return self["version"]
Inherited members