Module aws_lambda_powertools.utilities.data_classes
Event Source Data Classes utility provides classes self-describing Lambda event sources.
Sub-modules
aws_lambda_powertools.utilities.data_classes.active_mq_event
aws_lambda_powertools.utilities.data_classes.alb_event
aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event
aws_lambda_powertools.utilities.data_classes.api_gateway_proxy_event
aws_lambda_powertools.utilities.data_classes.appsync
aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event
aws_lambda_powertools.utilities.data_classes.appsync_resolver_event
aws_lambda_powertools.utilities.data_classes.aws_config_rule_event
aws_lambda_powertools.utilities.data_classes.bedrock_agent_event
aws_lambda_powertools.utilities.data_classes.cloud_watch_alarm_event
aws_lambda_powertools.utilities.data_classes.cloud_watch_custom_widget_event
aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event
aws_lambda_powertools.utilities.data_classes.cloudformation_custom_resource_event
aws_lambda_powertools.utilities.data_classes.code_deploy_lifecycle_hook_event
aws_lambda_powertools.utilities.data_classes.code_pipeline_job_event
aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event
aws_lambda_powertools.utilities.data_classes.common
aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event
aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event
aws_lambda_powertools.utilities.data_classes.event_bridge_event
aws_lambda_powertools.utilities.data_classes.kafka_event
aws_lambda_powertools.utilities.data_classes.kinesis_firehose_event
aws_lambda_powertools.utilities.data_classes.kinesis_stream_event
aws_lambda_powertools.utilities.data_classes.lambda_function_url_event
aws_lambda_powertools.utilities.data_classes.rabbit_mq_event
aws_lambda_powertools.utilities.data_classes.s3_batch_operation_event
aws_lambda_powertools.utilities.data_classes.s3_event
aws_lambda_powertools.utilities.data_classes.s3_object_event
aws_lambda_powertools.utilities.data_classes.secrets_manager_event
aws_lambda_powertools.utilities.data_classes.ses_event
aws_lambda_powertools.utilities.data_classes.shared_functions
aws_lambda_powertools.utilities.data_classes.sns_event
aws_lambda_powertools.utilities.data_classes.sqs_event
aws_lambda_powertools.utilities.data_classes.vpc_lattice
Functions
def event_source(handler: Callable[[Any, LambdaContext], Any], event: dict[str, Any], context: LambdaContext, data_class: type[DictWrapper])
-
Middleware to create an instance of the passed in event source data class
Parameters
handler
:Callable
- Lambda's handler
event
:dict[str, Any]
- Lambda's Event
context
:LambdaContext
- Lambda's Context
data_class
:type[DictWrapper]
- Data class type to instantiate
Example
Sample usage
from aws_lambda_powertools.utilities.data_classes import S3Event, event_source @event_source(data_class=S3Event) def handler(event: S3Event, context): return {"key": event.object_key}
Classes
class ALBEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Application load balancer event
Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/services-alb.html
- https://docs.aws.amazon.com/elasticloadbalancing/latest/application/lambda-functions.html
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class ALBEvent(BaseProxyEvent): """Application load balancer event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-alb.html - https://docs.aws.amazon.com/elasticloadbalancing/latest/application/lambda-functions.html """ @property def request_context(self) -> ALBEventRequestContext: return ALBEventRequestContext(self._data) @property def multi_value_query_string_parameters(self) -> dict[str, list[str]]: return self.get("multiValueQueryStringParameters") or {} @property def resolved_query_string_parameters(self) -> dict[str, list[str]]: return self.multi_value_query_string_parameters or super().resolved_query_string_parameters @property def multi_value_headers(self) -> dict[str, list[str]]: return CaseInsensitiveDict(self.get("multiValueHeaders")) @property def resolved_headers_field(self) -> dict[str, Any]: return self.multi_value_headers or self.headers def header_serializer(self) -> BaseHeadersSerializer: # When using the ALB integration, the `multiValueHeaders` feature can be disabled (default) or enabled. # We can determine if the feature is enabled by looking if the event has a `multiValueHeaders` key. if self.multi_value_headers: return MultiValueHeadersSerializer() return SingleValueHeadersSerializer()
Ancestors
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop multi_value_headers : dict[str, list[str]]
-
Expand source code
@property def multi_value_headers(self) -> dict[str, list[str]]: return CaseInsensitiveDict(self.get("multiValueHeaders"))
prop multi_value_query_string_parameters : dict[str, list[str]]
-
Expand source code
@property def multi_value_query_string_parameters(self) -> dict[str, list[str]]: return self.get("multiValueQueryStringParameters") or {}
prop request_context : ALBEventRequestContext
-
Expand source code
@property def request_context(self) -> ALBEventRequestContext: return ALBEventRequestContext(self._data)
Methods
def header_serializer(self) ‑> BaseHeadersSerializer
Inherited members
class APIGatewayProxyEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
AWS Lambda proxy V1
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class APIGatewayProxyEvent(BaseProxyEvent): """AWS Lambda proxy V1 Documentation: -------------- - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html """ @property def version(self) -> str: return self["version"] @property def resource(self) -> str: return self["resource"] @property def multi_value_headers(self) -> dict[str, list[str]]: return CaseInsensitiveDict(self.get("multiValueHeaders")) @property def multi_value_query_string_parameters(self) -> dict[str, list[str]]: return self.get("multiValueQueryStringParameters") or {} # key might exist but can be `null` @property def resolved_query_string_parameters(self) -> dict[str, list[str]]: if self.multi_value_query_string_parameters: return self.multi_value_query_string_parameters return super().resolved_query_string_parameters @property def resolved_headers_field(self) -> dict[str, Any]: return self.multi_value_headers or self.headers @property def request_context(self) -> APIGatewayEventRequestContext: return APIGatewayEventRequestContext(self._data) @property def path_parameters(self) -> dict[str, str]: return self.get("pathParameters") or {} @property def stage_variables(self) -> dict[str, str]: return self.get("stageVariables") or {} def header_serializer(self) -> BaseHeadersSerializer: return MultiValueHeadersSerializer()
Ancestors
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop multi_value_headers : dict[str, list[str]]
-
Expand source code
@property def multi_value_headers(self) -> dict[str, list[str]]: return CaseInsensitiveDict(self.get("multiValueHeaders"))
prop multi_value_query_string_parameters : dict[str, list[str]]
-
Expand source code
@property def multi_value_query_string_parameters(self) -> dict[str, list[str]]: return self.get("multiValueQueryStringParameters") or {} # key might exist but can be `null`
prop path_parameters : dict[str, str]
-
Expand source code
@property def path_parameters(self) -> dict[str, str]: return self.get("pathParameters") or {}
prop request_context : APIGatewayEventRequestContext
-
Expand source code
@property def request_context(self) -> APIGatewayEventRequestContext: return APIGatewayEventRequestContext(self._data)
prop resource : str
-
Expand source code
@property def resource(self) -> str: return self["resource"]
prop stage_variables : dict[str, str]
-
Expand source code
@property def stage_variables(self) -> dict[str, str]: return self.get("stageVariables") or {}
prop version : str
-
Expand source code
@property def version(self) -> str: return self["version"]
Methods
def header_serializer(self) ‑> BaseHeadersSerializer
Inherited members
class APIGatewayProxyEventV2 (data: dict[str, Any], json_deserializer: Callable | None = None)
-
AWS Lambda proxy V2 event
Notes:
Format 2.0 doesn't have multiValueHeaders or multiValueQueryStringParameters fields. Duplicate headers are combined with commas and included in the headers field. Duplicate query strings are combined with commas and included in the queryStringParameters field.
Format 2.0 includes a new cookies field. All cookie headers in the request are combined with commas and added to the cookies field. In the response to the client, each cookie becomes a set-cookie header.
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class APIGatewayProxyEventV2(BaseProxyEvent): """AWS Lambda proxy V2 event Notes: ----- Format 2.0 doesn't have multiValueHeaders or multiValueQueryStringParameters fields. Duplicate headers are combined with commas and included in the headers field. Duplicate query strings are combined with commas and included in the queryStringParameters field. Format 2.0 includes a new cookies field. All cookie headers in the request are combined with commas and added to the cookies field. In the response to the client, each cookie becomes a set-cookie header. Documentation: -------------- - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html """ @property def version(self) -> str: return self["version"] @property def route_key(self) -> str: return self["routeKey"] @property def raw_path(self) -> str: return self["rawPath"] @property def raw_query_string(self) -> str: return self["rawQueryString"] @property def cookies(self) -> list[str]: return self.get("cookies") or [] @property def request_context(self) -> RequestContextV2: return RequestContextV2(self._data) @property def path_parameters(self) -> dict[str, str]: return self.get("pathParameters") or {} @property def stage_variables(self) -> dict[str, str]: return self.get("stageVariables") or {} @property def path(self) -> str: stage = self.request_context.stage if stage != "$default": return self.raw_path[len("/" + stage) :] return self.raw_path @property def http_method(self) -> str: """The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT.""" return self.request_context.http.method def header_serializer(self): return HttpApiHeadersSerializer() @cached_property def resolved_headers_field(self) -> dict[str, Any]: return CaseInsensitiveDict((k, v.split(",") if "," in v else v) for k, v in self.headers.items())
Ancestors
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Subclasses
Instance variables
-
Expand source code
@property def cookies(self) -> list[str]: return self.get("cookies") or []
prop path : str
-
Expand source code
@property def path(self) -> str: stage = self.request_context.stage if stage != "$default": return self.raw_path[len("/" + stage) :] return self.raw_path
prop path_parameters : dict[str, str]
-
Expand source code
@property def path_parameters(self) -> dict[str, str]: return self.get("pathParameters") or {}
prop raw_path : str
-
Expand source code
@property def raw_path(self) -> str: return self["rawPath"]
prop raw_query_string : str
-
Expand source code
@property def raw_query_string(self) -> str: return self["rawQueryString"]
prop request_context : RequestContextV2
-
Expand source code
@property def request_context(self) -> RequestContextV2: return RequestContextV2(self._data)
prop route_key : str
-
Expand source code
@property def route_key(self) -> str: return self["routeKey"]
prop stage_variables : dict[str, str]
-
Expand source code
@property def stage_variables(self) -> dict[str, str]: return self.get("stageVariables") or {}
prop version : str
-
Expand source code
@property def version(self) -> str: return self["version"]
Methods
def header_serializer(self)
Inherited members
class AWSConfigRuleEvent (data: dict[str, Any])
-
Events for AWS Config Rules Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class AWSConfigRuleEvent(DictWrapper): """Events for AWS Config Rules Documentation: -------------- - https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_lambda-functions.html """ def __init__(self, data: dict[str, Any]): super().__init__(data) self._invoking_event: Any | None = None self._rule_parameters: Any | None = None @property def version(self) -> str: """The version of the event.""" return self["version"] @property def invoking_event( self, ) -> AWSConfigConfigurationChanged | AWSConfigScheduledNotification | AWSConfigOversizedConfiguration: """The invoking payload of the event.""" if self._invoking_event is None: self._invoking_event = self._json_deserializer(self["invokingEvent"]) return get_invoke_event(self._invoking_event) @property def raw_invoking_event(self) -> str: """The raw invoking payload of the event.""" return self["invokingEvent"] @property def rule_parameters(self) -> dict: """The parameters of the event.""" if self._rule_parameters is None: self._rule_parameters = self._json_deserializer(self["ruleParameters"]) return self._rule_parameters @property def result_token(self) -> str: """The result token of the event.""" return self["resultToken"] @property def event_left_scope(self) -> bool: """The left scope of the event.""" return self["eventLeftScope"] @property def execution_role_arn(self) -> str: """The execution role arn of the event.""" return self["executionRoleArn"] @property def config_rule_arn(self) -> str: """The arn of the rule of the event.""" return self["configRuleArn"] @property def config_rule_name(self) -> str: """The name of the rule of the event.""" return self["configRuleName"] @property def config_rule_id(self) -> str: """The id of the rule of the event.""" return self["configRuleId"] @property def accountid(self) -> str: """The accountid of the event.""" return self["accountId"] @property def evalution_mode(self) -> str | None: """The evalution mode of the event.""" return self.get("evaluationMode")
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop accountid : str
-
The accountid of the event.
Expand source code
@property def accountid(self) -> str: """The accountid of the event.""" return self["accountId"]
prop config_rule_arn : str
-
The arn of the rule of the event.
Expand source code
@property def config_rule_arn(self) -> str: """The arn of the rule of the event.""" return self["configRuleArn"]
prop config_rule_id : str
-
The id of the rule of the event.
Expand source code
@property def config_rule_id(self) -> str: """The id of the rule of the event.""" return self["configRuleId"]
prop config_rule_name : str
-
The name of the rule of the event.
Expand source code
@property def config_rule_name(self) -> str: """The name of the rule of the event.""" return self["configRuleName"]
prop evalution_mode : str | None
-
The evalution mode of the event.
Expand source code
@property def evalution_mode(self) -> str | None: """The evalution mode of the event.""" return self.get("evaluationMode")
prop event_left_scope : bool
-
The left scope of the event.
Expand source code
@property def event_left_scope(self) -> bool: """The left scope of the event.""" return self["eventLeftScope"]
prop execution_role_arn : str
-
The execution role arn of the event.
Expand source code
@property def execution_role_arn(self) -> str: """The execution role arn of the event.""" return self["executionRoleArn"]
prop invoking_event : AWSConfigConfigurationChanged | AWSConfigScheduledNotification | AWSConfigOversizedConfiguration
-
The invoking payload of the event.
Expand source code
@property def invoking_event( self, ) -> AWSConfigConfigurationChanged | AWSConfigScheduledNotification | AWSConfigOversizedConfiguration: """The invoking payload of the event.""" if self._invoking_event is None: self._invoking_event = self._json_deserializer(self["invokingEvent"]) return get_invoke_event(self._invoking_event)
prop raw_invoking_event : str
-
The raw invoking payload of the event.
Expand source code
@property def raw_invoking_event(self) -> str: """The raw invoking payload of the event.""" return self["invokingEvent"]
prop result_token : str
-
The result token of the event.
Expand source code
@property def result_token(self) -> str: """The result token of the event.""" return self["resultToken"]
prop rule_parameters : dict
-
The parameters of the event.
Expand source code
@property def rule_parameters(self) -> dict: """The parameters of the event.""" if self._rule_parameters is None: self._rule_parameters = self._json_deserializer(self["ruleParameters"]) return self._rule_parameters
prop version : str
-
The version of the event.
Expand source code
@property def version(self) -> str: """The version of the event.""" return self["version"]
Inherited members
class AppSyncResolverEvent (data: dict)
-
AppSync resolver event
NOTE: AppSync Resolver Events can come in various shapes this data class supports both Amplify GraphQL directive @function and Direct Lambda Resolver
Documentation:
- https://docs.aws.amazon.com/appsync/latest/devguide/resolver-context-reference.html
- https://docs.amplify.aws/cli/graphql-transformer/function#structure-of-the-function-event
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class AppSyncResolverEvent(DictWrapper): """AppSync resolver event **NOTE:** AppSync Resolver Events can come in various shapes this data class supports both Amplify GraphQL directive @function and Direct Lambda Resolver Documentation: ------------- - https://docs.aws.amazon.com/appsync/latest/devguide/resolver-context-reference.html - https://docs.amplify.aws/cli/graphql-transformer/function#structure-of-the-function-event """ def __init__(self, data: dict): super().__init__(data) info: dict | None = data.get("info") if not info: parent_type_name = self.get("parentTypeName") or self.get("typeName") info = {"fieldName": self.get("fieldName"), "parentTypeName": parent_type_name} self._info = AppSyncResolverEventInfo(info) @property def type_name(self) -> str: """The name of the parent type for the field that is currently being resolved.""" return self.info.parent_type_name @property def field_name(self) -> str: """The name of the field that is currently being resolved.""" return self.info.field_name @property def arguments(self) -> dict[str, Any]: """A map that contains all GraphQL arguments for this field.""" return self["arguments"] @property def identity(self) -> AppSyncIdentityIAM | AppSyncIdentityCognito | None: """An object that contains information about the caller. Depending on the type of identify found: - API_KEY authorization - returns None - AWS_IAM authorization - returns AppSyncIdentityIAM - AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito """ return get_identity_object(self.get("identity")) @property def source(self) -> dict[str, Any]: """A map that contains the resolution of the parent field.""" return self.get("source") or {} @property def request_headers(self) -> dict[str, str]: """Request headers""" return CaseInsensitiveDict(self["request"]["headers"]) @property def prev_result(self) -> dict[str, Any] | None: """It represents the result of whatever previous operation was executed in a pipeline resolver.""" prev = self.get("prev") if not prev: return None return prev.get("result") @property def info(self) -> AppSyncResolverEventInfo: """The info section contains information about the GraphQL request.""" return self._info @property def stash(self) -> dict: """The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver.""" return self.get("stash") or {} @overload def get_header_value( self, name: str, default_value: str, case_sensitive: bool = False, ) -> str: ... @overload def get_header_value( self, name: str, default_value: str | None = None, case_sensitive: bool = False, ) -> str | None: ... @deprecated( "`get_header_value` function is deprecated; Access headers directly using event.headers.get('HeaderName')", category=None, ) def get_header_value( self, name: str, default_value: str | None = None, case_sensitive: bool = False, ) -> str | None: """Get header value by name Parameters ---------- name: str Header name default_value: str, optional Default value if no value was found by name case_sensitive: bool Whether to use a case-sensitive look up Returns ------- str, optional Header value """ warnings.warn( "The `get_header_value` function is deprecated in V3 and the `case_sensitive` parameter " "no longer has any effect. This function will be removed in the next major version. " "Instead, access headers directly using event.headers.get('HeaderName'), which is case insensitive.", category=PowertoolsDeprecationWarning, stacklevel=2, ) return get_header_value(self.request_headers, name, default_value, case_sensitive)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop arguments : dict[str, Any]
-
A map that contains all GraphQL arguments for this field.
Expand source code
@property def arguments(self) -> dict[str, Any]: """A map that contains all GraphQL arguments for this field.""" return self["arguments"]
prop field_name : str
-
The name of the field that is currently being resolved.
Expand source code
@property def field_name(self) -> str: """The name of the field that is currently being resolved.""" return self.info.field_name
prop identity : AppSyncIdentityIAM | AppSyncIdentityCognito | None
-
An object that contains information about the caller.
Depending on the type of identify found:
- API_KEY authorization - returns None
- AWS_IAM authorization - returns AppSyncIdentityIAM
- AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito
Expand source code
@property def identity(self) -> AppSyncIdentityIAM | AppSyncIdentityCognito | None: """An object that contains information about the caller. Depending on the type of identify found: - API_KEY authorization - returns None - AWS_IAM authorization - returns AppSyncIdentityIAM - AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito """ return get_identity_object(self.get("identity"))
prop info : AppSyncResolverEventInfo
-
The info section contains information about the GraphQL request.
Expand source code
@property def info(self) -> AppSyncResolverEventInfo: """The info section contains information about the GraphQL request.""" return self._info
prop prev_result : dict[str, Any] | None
-
It represents the result of whatever previous operation was executed in a pipeline resolver.
Expand source code
@property def prev_result(self) -> dict[str, Any] | None: """It represents the result of whatever previous operation was executed in a pipeline resolver.""" prev = self.get("prev") if not prev: return None return prev.get("result")
prop request_headers : dict[str, str]
-
Request headers
Expand source code
@property def request_headers(self) -> dict[str, str]: """Request headers""" return CaseInsensitiveDict(self["request"]["headers"])
prop source : dict[str, Any]
-
A map that contains the resolution of the parent field.
Expand source code
@property def source(self) -> dict[str, Any]: """A map that contains the resolution of the parent field.""" return self.get("source") or {}
prop stash : dict
-
The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver.
Expand source code
@property def stash(self) -> dict: """The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver.""" return self.get("stash") or {}
prop type_name : str
-
The name of the parent type for the field that is currently being resolved.
Expand source code
@property def type_name(self) -> str: """The name of the parent type for the field that is currently being resolved.""" return self.info.parent_type_name
Methods
def get_header_value(self, name: str, default_value: str | None = None, case_sensitive: bool = False) ‑> str | None
-
Get header value by name Parameters
name
:str
- Header name
default_value
:str
, optional- Default value if no value was found by name
case_sensitive
:bool
- Whether to use a case-sensitive look up
Returns
str
, optional- Header value
Inherited members
class BedrockAgentEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Bedrock Agent input event
See https://docs.aws.amazon.com/bedrock/latest/userguide/agents-create.html
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class BedrockAgentEvent(BaseProxyEvent): """ Bedrock Agent input event See https://docs.aws.amazon.com/bedrock/latest/userguide/agents-create.html """ @property def message_version(self) -> str: return self["messageVersion"] @property def input_text(self) -> str: return self["inputText"] @property def session_id(self) -> str: return self["sessionId"] @property def action_group(self) -> str: return self["actionGroup"] @property def api_path(self) -> str: return self["apiPath"] @property def http_method(self) -> str: return self["httpMethod"] @property def parameters(self) -> list[BedrockAgentProperty]: parameters = self.get("parameters") or [] return [BedrockAgentProperty(x) for x in parameters] @property def request_body(self) -> BedrockAgentRequestBody | None: return BedrockAgentRequestBody(self["requestBody"]) if self.get("requestBody") else None @property def agent(self) -> BedrockAgentInfo: return BedrockAgentInfo(self["agent"]) @property def session_attributes(self) -> dict[str, str]: return self["sessionAttributes"] @property def prompt_session_attributes(self) -> dict[str, str]: return self["promptSessionAttributes"] # The following methods add compatibility with BaseProxyEvent @property def path(self) -> str: return self["apiPath"] @cached_property def query_string_parameters(self) -> dict[str, str]: # In Bedrock Agent events, query string parameters are passed as undifferentiated parameters, # together with the other parameters. So we just return all parameters here. parameters = self.get("parameters") or [] return {x["name"]: x["value"] for x in parameters} @property def resolved_headers_field(self) -> dict[str, Any]: return {} @cached_property def json_body(self) -> Any: # In Bedrock Agent events, body parameters are encoded differently # @see https://docs.aws.amazon.com/bedrock/latest/userguide/agents-lambda.html#agents-lambda-input if not self.request_body: return None json_body = self.request_body.content.get("application/json") if not json_body: return None return {x.name: x.value for x in json_body.properties}
Ancestors
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop action_group : str
-
Expand source code
@property def action_group(self) -> str: return self["actionGroup"]
prop agent : BedrockAgentInfo
-
Expand source code
@property def agent(self) -> BedrockAgentInfo: return BedrockAgentInfo(self["agent"])
prop api_path : str
-
Expand source code
@property def api_path(self) -> str: return self["apiPath"]
prop input_text : str
-
Expand source code
@property def input_text(self) -> str: return self["inputText"]
prop message_version : str
-
Expand source code
@property def message_version(self) -> str: return self["messageVersion"]
prop parameters : list[BedrockAgentProperty]
-
Expand source code
@property def parameters(self) -> list[BedrockAgentProperty]: parameters = self.get("parameters") or [] return [BedrockAgentProperty(x) for x in parameters]
prop path : str
-
Expand source code
@property def path(self) -> str: return self["apiPath"]
prop prompt_session_attributes : dict[str, str]
-
Expand source code
@property def prompt_session_attributes(self) -> dict[str, str]: return self["promptSessionAttributes"]
var query_string_parameters
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
prop request_body : BedrockAgentRequestBody | None
-
Expand source code
@property def request_body(self) -> BedrockAgentRequestBody | None: return BedrockAgentRequestBody(self["requestBody"]) if self.get("requestBody") else None
prop session_attributes : dict[str, str]
-
Expand source code
@property def session_attributes(self) -> dict[str, str]: return self["sessionAttributes"]
prop session_id : str
-
Expand source code
@property def session_id(self) -> str: return self["sessionId"]
Inherited members
class CloudFormationCustomResourceEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudFormationCustomResourceEvent(DictWrapper): @property def request_type(self) -> Literal["Create", "Update", "Delete"]: return self["RequestType"] @property def service_token(self) -> str: return self["ServiceToken"] @property def response_url(self) -> str: return self["ResponseURL"] @property def stack_id(self) -> str: return self["StackId"] @property def request_id(self) -> str: return self["RequestId"] @property def logical_resource_id(self) -> str: return self["LogicalResourceId"] @property def physical_resource_id(self) -> str: return self.get("PhysicalResourceId") or "" @property def resource_type(self) -> str: return self["ResourceType"] @property def resource_properties(self) -> dict[str, Any]: return self.get("ResourceProperties") or {} @property def old_resource_properties(self) -> dict[str, Any]: return self.get("OldResourceProperties") or {}
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop logical_resource_id : str
-
Expand source code
@property def logical_resource_id(self) -> str: return self["LogicalResourceId"]
prop old_resource_properties : dict[str, Any]
-
Expand source code
@property def old_resource_properties(self) -> dict[str, Any]: return self.get("OldResourceProperties") or {}
prop physical_resource_id : str
-
Expand source code
@property def physical_resource_id(self) -> str: return self.get("PhysicalResourceId") or ""
prop request_id : str
-
Expand source code
@property def request_id(self) -> str: return self["RequestId"]
prop request_type : Literal['Create', 'Update', 'Delete']
-
Expand source code
@property def request_type(self) -> Literal["Create", "Update", "Delete"]: return self["RequestType"]
prop resource_properties : dict[str, Any]
-
Expand source code
@property def resource_properties(self) -> dict[str, Any]: return self.get("ResourceProperties") or {}
prop resource_type : str
-
Expand source code
@property def resource_type(self) -> str: return self["ResourceType"]
prop response_url : str
-
Expand source code
@property def response_url(self) -> str: return self["ResponseURL"]
prop service_token : str
-
Expand source code
@property def service_token(self) -> str: return self["ServiceToken"]
prop stack_id : str
-
Expand source code
@property def stack_id(self) -> str: return self["StackId"]
Inherited members
class CloudWatchAlarmConfiguration (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudWatchAlarmConfiguration(DictWrapper): @property def description(self) -> str | None: """ Optional description for the Alarm. """ return self.get("description", None) @property def alarm_rule(self) -> str | None: """ Optional description for the Alarm rule in case of composite alarm. """ return self.get("alarmRule", None) @property def alarm_actions_suppressor(self) -> str | None: """ Optional action suppression for the Alarm rule in case of composite alarm. """ return self.get("actionsSuppressor", None) @property def alarm_actions_suppressor_wait_period(self) -> str | None: """ Optional action suppression wait period for the Alarm rule in case of composite alarm. """ return self.get("actionsSuppressorWaitPeriod", None) @property def alarm_actions_suppressor_extension_period(self) -> str | None: """ Optional action suppression extension period for the Alarm rule in case of composite alarm. """ return self.get("actionsSuppressorExtensionPeriod", None) @property def metrics(self) -> list[CloudWatchAlarmMetric]: """ The metrics evaluated for the Alarm. """ metrics = self.get("metrics") or [] return [CloudWatchAlarmMetric(i) for i in metrics]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop alarm_actions_suppressor : str | None
-
Optional action suppression for the Alarm rule in case of composite alarm.
Expand source code
@property def alarm_actions_suppressor(self) -> str | None: """ Optional action suppression for the Alarm rule in case of composite alarm. """ return self.get("actionsSuppressor", None)
prop alarm_actions_suppressor_extension_period : str | None
-
Optional action suppression extension period for the Alarm rule in case of composite alarm.
Expand source code
@property def alarm_actions_suppressor_extension_period(self) -> str | None: """ Optional action suppression extension period for the Alarm rule in case of composite alarm. """ return self.get("actionsSuppressorExtensionPeriod", None)
prop alarm_actions_suppressor_wait_period : str | None
-
Optional action suppression wait period for the Alarm rule in case of composite alarm.
Expand source code
@property def alarm_actions_suppressor_wait_period(self) -> str | None: """ Optional action suppression wait period for the Alarm rule in case of composite alarm. """ return self.get("actionsSuppressorWaitPeriod", None)
prop alarm_rule : str | None
-
Optional description for the Alarm rule in case of composite alarm.
Expand source code
@property def alarm_rule(self) -> str | None: """ Optional description for the Alarm rule in case of composite alarm. """ return self.get("alarmRule", None)
prop description : str | None
-
Optional description for the Alarm.
Expand source code
@property def description(self) -> str | None: """ Optional description for the Alarm. """ return self.get("description", None)
prop metrics : list[CloudWatchAlarmMetric]
-
The metrics evaluated for the Alarm.
Expand source code
@property def metrics(self) -> list[CloudWatchAlarmMetric]: """ The metrics evaluated for the Alarm. """ metrics = self.get("metrics") or [] return [CloudWatchAlarmMetric(i) for i in metrics]
Inherited members
class CloudWatchAlarmData (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudWatchAlarmData(DictWrapper): @property def alarm_name(self) -> str: """ Alarm name. """ return self["alarmName"] @property def state(self) -> CloudWatchAlarmState: """ The current state of the Alarm. """ return CloudWatchAlarmState(self["state"]) @property def previous_state(self) -> CloudWatchAlarmState: """ The previous state of the Alarm. """ return CloudWatchAlarmState(self["previousState"]) @property def configuration(self) -> CloudWatchAlarmConfiguration: """ The configuration of the Alarm. """ return CloudWatchAlarmConfiguration(self["configuration"])
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop alarm_name : str
-
Alarm name.
Expand source code
@property def alarm_name(self) -> str: """ Alarm name. """ return self["alarmName"]
prop configuration : CloudWatchAlarmConfiguration
-
The configuration of the Alarm.
Expand source code
@property def configuration(self) -> CloudWatchAlarmConfiguration: """ The configuration of the Alarm. """ return CloudWatchAlarmConfiguration(self["configuration"])
prop previous_state : CloudWatchAlarmState
-
The previous state of the Alarm.
Expand source code
@property def previous_state(self) -> CloudWatchAlarmState: """ The previous state of the Alarm. """ return CloudWatchAlarmState(self["previousState"])
prop state : CloudWatchAlarmState
-
The current state of the Alarm.
Expand source code
@property def state(self) -> CloudWatchAlarmState: """ The current state of the Alarm. """ return CloudWatchAlarmState(self["state"])
Inherited members
class CloudWatchAlarmEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudWatchAlarmEvent(DictWrapper): @property def source(self) -> Literal["aws.cloudwatch"]: """ Source of the triggered event. """ return self["source"] @property def alarm_arn(self) -> str: """ The ARN of the CloudWatch Alarm. """ return self["alarmArn"] @property def region(self) -> str: """ The AWS region in which the Alarm is active. """ return self["region"] @property def source_account_id(self) -> str: """ The AWS Account ID that the Alarm is deployed to. """ return self["accountId"] @property def timestamp(self) -> str: """ Alarm state change event timestamp in ISO-8601 format. """ return self["time"] @property def alarm_data(self) -> CloudWatchAlarmData: """ Contains basic data about the Alarm and its current and previous states. """ return CloudWatchAlarmData(self["alarmData"])
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop alarm_arn : str
-
The ARN of the CloudWatch Alarm.
Expand source code
@property def alarm_arn(self) -> str: """ The ARN of the CloudWatch Alarm. """ return self["alarmArn"]
prop alarm_data : CloudWatchAlarmData
-
Contains basic data about the Alarm and its current and previous states.
Expand source code
@property def alarm_data(self) -> CloudWatchAlarmData: """ Contains basic data about the Alarm and its current and previous states. """ return CloudWatchAlarmData(self["alarmData"])
prop region : str
-
The AWS region in which the Alarm is active.
Expand source code
@property def region(self) -> str: """ The AWS region in which the Alarm is active. """ return self["region"]
prop source : Literal['aws.cloudwatch']
-
Source of the triggered event.
Expand source code
@property def source(self) -> Literal["aws.cloudwatch"]: """ Source of the triggered event. """ return self["source"]
prop source_account_id : str
-
The AWS Account ID that the Alarm is deployed to.
Expand source code
@property def source_account_id(self) -> str: """ The AWS Account ID that the Alarm is deployed to. """ return self["accountId"]
prop timestamp : str
-
Alarm state change event timestamp in ISO-8601 format.
Expand source code
@property def timestamp(self) -> str: """ Alarm state change event timestamp in ISO-8601 format. """ return self["time"]
Inherited members
class CloudWatchAlarmMetric (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudWatchAlarmMetric(DictWrapper): @property def metric_id(self) -> str: """ Unique ID of the alarm metric. """ return self["id"] @property def expression(self) -> str | None: """ Optional expression of the alarm metric. """ return self.get("expression", None) @property def label(self) -> str | None: """ Optional label of the alarm metric. """ return self.get("label", None) @property def return_data(self) -> bool: """ Whether this metric data is used to determine the state of the alarm or not. """ return self["returnData"] @property def metric_stat(self) -> CloudWatchAlarmMetricStat: return CloudWatchAlarmMetricStat(self["metricStat"])
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop expression : str | None
-
Optional expression of the alarm metric.
Expand source code
@property def expression(self) -> str | None: """ Optional expression of the alarm metric. """ return self.get("expression", None)
prop label : str | None
-
Optional label of the alarm metric.
Expand source code
@property def label(self) -> str | None: """ Optional label of the alarm metric. """ return self.get("label", None)
prop metric_id : str
-
Unique ID of the alarm metric.
Expand source code
@property def metric_id(self) -> str: """ Unique ID of the alarm metric. """ return self["id"]
prop metric_stat : CloudWatchAlarmMetricStat
-
Expand source code
@property def metric_stat(self) -> CloudWatchAlarmMetricStat: return CloudWatchAlarmMetricStat(self["metricStat"])
prop return_data : bool
-
Whether this metric data is used to determine the state of the alarm or not.
Expand source code
@property def return_data(self) -> bool: """ Whether this metric data is used to determine the state of the alarm or not. """ return self["returnData"]
Inherited members
class CloudWatchAlarmMetricStat (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudWatchAlarmMetricStat(DictWrapper): @property def period(self) -> int | None: """ Metric evaluation period, in seconds. """ return self.get("period", None) @property def stat(self) -> str | None: """ Statistical aggregation of metric points, e.g. Average, SampleCount, etc. """ return self.get("stat", None) @property def unit(self) -> str | None: """ Unit for metric. """ return self.get("unit", None) @property def metric(self) -> dict: """ Metric details """ return self.get("metric") or {}
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop metric : dict
-
Metric details
Expand source code
@property def metric(self) -> dict: """ Metric details """ return self.get("metric") or {}
prop period : int | None
-
Metric evaluation period, in seconds.
Expand source code
@property def period(self) -> int | None: """ Metric evaluation period, in seconds. """ return self.get("period", None)
prop stat : str | None
-
Statistical aggregation of metric points, e.g. Average, SampleCount, etc.
Expand source code
@property def stat(self) -> str | None: """ Statistical aggregation of metric points, e.g. Average, SampleCount, etc. """ return self.get("stat", None)
prop unit : str | None
-
Unit for metric.
Expand source code
@property def unit(self) -> str | None: """ Unit for metric. """ return self.get("unit", None)
Inherited members
class CloudWatchAlarmState (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudWatchAlarmState(DictWrapper): @property def value(self) -> Literal["OK", "ALARM", "INSUFFICIENT_DATA"]: """ Overall state of the alarm. """ return self["value"] @property def reason(self) -> str: """ Reason why alarm was changed to this state. """ return self["reason"] @property def reason_data(self) -> str: """ Additional data to back up the reason, usually contains the evaluated data points, the calculated threshold and timestamps. """ return self["reasonData"] @cached_property def reason_data_decoded(self) -> Any | None: """ Deserialized version of reason_data. """ return self._json_deserializer(self.reason_data) if self.reason_data else None @property def actions_suppressed_by(self) -> Literal["Alarm", "ExtensionPeriod", "WaitPeriod"] | None: """ Describes why the actions when the value is `ALARM` are suppressed in a composite alarm. """ return self.get("actionsSuppressedBy", None) @property def actions_suppressed_reason(self) -> str | None: """ Captures the reason for action suppression. """ return self.get("actionsSuppressedReason", None) @property def timestamp(self) -> str: """ Timestamp of this state change in ISO-8601 format. """ return self["timestamp"]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop actions_suppressed_by : Literal['Alarm', 'ExtensionPeriod', 'WaitPeriod'] | None
-
Describes why the actions when the value is
ALARM
are suppressed in a composite alarm.Expand source code
@property def actions_suppressed_by(self) -> Literal["Alarm", "ExtensionPeriod", "WaitPeriod"] | None: """ Describes why the actions when the value is `ALARM` are suppressed in a composite alarm. """ return self.get("actionsSuppressedBy", None)
prop actions_suppressed_reason : str | None
-
Captures the reason for action suppression.
Expand source code
@property def actions_suppressed_reason(self) -> str | None: """ Captures the reason for action suppression. """ return self.get("actionsSuppressedReason", None)
prop reason : str
-
Reason why alarm was changed to this state.
Expand source code
@property def reason(self) -> str: """ Reason why alarm was changed to this state. """ return self["reason"]
prop reason_data : str
-
Additional data to back up the reason, usually contains the evaluated data points, the calculated threshold and timestamps.
Expand source code
@property def reason_data(self) -> str: """ Additional data to back up the reason, usually contains the evaluated data points, the calculated threshold and timestamps. """ return self["reasonData"]
var reason_data_decoded
-
Deserialized version of reason_data.
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
prop timestamp : str
-
Timestamp of this state change in ISO-8601 format.
Expand source code
@property def timestamp(self) -> str: """ Timestamp of this state change in ISO-8601 format. """ return self["timestamp"]
prop value : Literal['OK', 'ALARM', 'INSUFFICIENT_DATA']
-
Overall state of the alarm.
Expand source code
@property def value(self) -> Literal["OK", "ALARM", "INSUFFICIENT_DATA"]: """ Overall state of the alarm. """ return self["value"]
Inherited members
class CloudWatchDashboardCustomWidgetEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
CloudWatch dashboard custom widget event
You can use a Lambda function to create a custom widget on a CloudWatch dashboard.
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudWatchDashboardCustomWidgetEvent(DictWrapper): """CloudWatch dashboard custom widget event You can use a Lambda function to create a custom widget on a CloudWatch dashboard. Documentation: ------------- - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/add_custom_widget_dashboard_about.html """ @property def describe(self) -> bool: """Display widget documentation""" return bool(self.get("describe", False)) @property def widget_context(self) -> CloudWatchWidgetContext | None: """The widget context""" if self.get("widgetContext"): return CloudWatchWidgetContext(self["widgetContext"]) return None
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop describe : bool
-
Display widget documentation
Expand source code
@property def describe(self) -> bool: """Display widget documentation""" return bool(self.get("describe", False))
prop widget_context : CloudWatchWidgetContext | None
-
The widget context
Expand source code
@property def widget_context(self) -> CloudWatchWidgetContext | None: """The widget context""" if self.get("widgetContext"): return CloudWatchWidgetContext(self["widgetContext"]) return None
Inherited members
class CloudWatchLogsEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
CloudWatch Logs log stream event
You can use a Lambda function to monitor and analyze logs from an Amazon CloudWatch Logs log stream.
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CloudWatchLogsEvent(DictWrapper): """CloudWatch Logs log stream event You can use a Lambda function to monitor and analyze logs from an Amazon CloudWatch Logs log stream. Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html """ _decompressed_logs_data = None _json_logs_data = None @property def raw_logs_data(self) -> str: """The value of the `data` field is a Base64 encoded ZIP archive.""" return self["awslogs"]["data"] @property def decompress_logs_data(self) -> bytes: """Decode and decompress log data""" if self._decompressed_logs_data is None: payload = base64.b64decode(self.raw_logs_data) self._decompressed_logs_data = zlib.decompress(payload, zlib.MAX_WBITS | 32) return self._decompressed_logs_data def parse_logs_data(self) -> CloudWatchLogsDecodedData: """Decode, decompress and parse json data as CloudWatchLogsDecodedData""" if self._json_logs_data is None: self._json_logs_data = self._json_deserializer(self.decompress_logs_data.decode("UTF-8")) return CloudWatchLogsDecodedData(self._json_logs_data)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop decompress_logs_data : bytes
-
Decode and decompress log data
Expand source code
@property def decompress_logs_data(self) -> bytes: """Decode and decompress log data""" if self._decompressed_logs_data is None: payload = base64.b64decode(self.raw_logs_data) self._decompressed_logs_data = zlib.decompress(payload, zlib.MAX_WBITS | 32) return self._decompressed_logs_data
prop raw_logs_data : str
-
The value of the
data
field is a Base64 encoded ZIP archive.Expand source code
@property def raw_logs_data(self) -> str: """The value of the `data` field is a Base64 encoded ZIP archive.""" return self["awslogs"]["data"]
Methods
def parse_logs_data(self) ‑> CloudWatchLogsDecodedData
-
Decode, decompress and parse json data as CloudWatchLogsDecodedData
Inherited members
class CodeDeployLifecycleHookEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CodeDeployLifecycleHookEvent(DictWrapper): @property def deployment_id(self) -> str: """The unique ID of the calling CodeDeploy Deployment.""" return self["DeploymentId"] @property def lifecycle_event_hook_execution_id(self) -> str: """The unique ID of a deployments lifecycle hook.""" return self["LifecycleEventHookExecutionId"]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop deployment_id : str
-
The unique ID of the calling CodeDeploy Deployment.
Expand source code
@property def deployment_id(self) -> str: """The unique ID of the calling CodeDeploy Deployment.""" return self["DeploymentId"]
prop lifecycle_event_hook_execution_id : str
-
The unique ID of a deployments lifecycle hook.
Expand source code
@property def lifecycle_event_hook_execution_id(self) -> str: """The unique ID of a deployments lifecycle hook.""" return self["LifecycleEventHookExecutionId"]
Inherited members
class CodePipelineJobEvent (data: dict[str, Any])
-
AWS CodePipeline Job Event
Documentation:
- https://docs.aws.amazon.com/codepipeline/latest/userguide/actions-invoke-lambda-function.html
- https://docs.aws.amazon.com/lambda/latest/dg/services-codepipeline.html
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class CodePipelineJobEvent(DictWrapper): """AWS CodePipeline Job Event Documentation: ------------- - https://docs.aws.amazon.com/codepipeline/latest/userguide/actions-invoke-lambda-function.html - https://docs.aws.amazon.com/lambda/latest/dg/services-codepipeline.html """ def __init__(self, data: dict[str, Any]): super().__init__(data) self._job = self["CodePipeline.job"] @property def get_id(self) -> str: """Job id""" return self._job["id"] @property def account_id(self) -> str: """Account id""" return self._job["accountId"] @property def data(self) -> CodePipelineData: """Code pipeline jab data""" return CodePipelineData(self._job["data"]) @property def user_parameters(self) -> str | None: """Action configuration user parameters""" return self.data.action_configuration.configuration.user_parameters @property def decoded_user_parameters(self) -> dict[str, Any]: """Json Decoded action configuration user parameters""" return self.data.action_configuration.configuration.decoded_user_parameters @property def input_bucket_name(self) -> str: """Get the first input artifact bucket name""" return self.data.input_artifacts[0].location.s3_location.bucket_name @property def input_object_key(self) -> str: """Get the first input artifact order key unquote plus""" return self.data.input_artifacts[0].location.s3_location.object_key def setup_s3_client(self): """Creates an S3 client Uses the credentials passed in the event by CodePipeline. These credentials can be used to access the artifact bucket. Returns ------- BaseClient An S3 client with the appropriate credentials """ # IMPORTING boto3 within the FUNCTION and not at the top level to get # it only when we explicitly want it for better performance. import boto3 from aws_lambda_powertools.shared import user_agent s3 = boto3.client( "s3", aws_access_key_id=self.data.artifact_credentials.access_key_id, aws_secret_access_key=self.data.artifact_credentials.secret_access_key, aws_session_token=self.data.artifact_credentials.session_token, ) user_agent.register_feature_to_client(client=s3, feature="data_classes") return s3 def find_input_artifact(self, artifact_name: str) -> CodePipelineArtifact | None: """Find an input artifact by artifact name Parameters ---------- artifact_name : str The name of the input artifact to look for Returns ------- CodePipelineArtifact, None Matching CodePipelineArtifact if found """ for artifact in self.data.input_artifacts: if artifact.name == artifact_name: return artifact return None def find_output_artifact(self, artifact_name: str) -> CodePipelineArtifact | None: """Find an output artifact by artifact name Parameters ---------- artifact_name : str The name of the output artifact to look for Returns ------- CodePipelineArtifact, None Matching CodePipelineArtifact if found """ for artifact in self.data.output_artifacts: if artifact.name == artifact_name: return artifact return None def get_artifact(self, artifact_name: str, filename: str | None = None) -> str | None: """Get a file within an artifact zip on s3 Parameters ---------- artifact_name : str Name of the S3 artifact to download filename : str The file name within the artifact zip to extract as a string If None, this will return the raw object body. Returns ------- str, None Returns the contents file contents as a string """ artifact = self.find_input_artifact(artifact_name) if artifact is None: return None s3 = self.setup_s3_client() bucket = artifact.location.s3_location.bucket_name key = artifact.location.s3_location.key if filename: with tempfile.NamedTemporaryFile() as tmp_file: s3.download_file(bucket, key, tmp_file.name) with zipfile.ZipFile(tmp_file.name, "r") as zip_file: return zip_file.read(filename).decode("UTF-8") return s3.get_object(Bucket=bucket, Key=key)["Body"].read() def put_artifact(self, artifact_name: str, body: Any, content_type: str) -> None: """Writes an object to an s3 output artifact. Parameters ---------- artifact_name : str Name of the S3 artifact to upload body: Any The data to be written. Binary files should use io.BytesIO. content_type: str The content type of the data. Returns ------- None """ artifact = self.find_output_artifact(artifact_name) if artifact is None: raise ValueError(f"Artifact not found: {artifact_name}.") s3 = self.setup_s3_client() bucket = artifact.location.s3_location.bucket_name key = artifact.location.s3_location.key # boto3 doesn't support None to omit the parameter when using ServerSideEncryption and SSEKMSKeyId # So we are using if/else instead. if self.data.encryption_key: encryption_key_id = self.data.encryption_key.get_id encryption_key_type = self.data.encryption_key.get_type if encryption_key_type == "KMS": encryption_key_type = "aws:kms" s3.put_object( Bucket=bucket, Key=key, ContentType=content_type, Body=body, ServerSideEncryption=encryption_key_type, SSEKMSKeyId=encryption_key_id, BucketKeyEnabled=True, ) else: s3.put_object( Bucket=bucket, Key=key, ContentType=content_type, Body=body, BucketKeyEnabled=True, )
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop account_id : str
-
Account id
Expand source code
@property def account_id(self) -> str: """Account id""" return self._job["accountId"]
prop data : CodePipelineData
-
Code pipeline jab data
Expand source code
@property def data(self) -> CodePipelineData: """Code pipeline jab data""" return CodePipelineData(self._job["data"])
prop decoded_user_parameters : dict[str, Any]
-
Json Decoded action configuration user parameters
Expand source code
@property def decoded_user_parameters(self) -> dict[str, Any]: """Json Decoded action configuration user parameters""" return self.data.action_configuration.configuration.decoded_user_parameters
prop get_id : str
-
Job id
Expand source code
@property def get_id(self) -> str: """Job id""" return self._job["id"]
prop input_bucket_name : str
-
Get the first input artifact bucket name
Expand source code
@property def input_bucket_name(self) -> str: """Get the first input artifact bucket name""" return self.data.input_artifacts[0].location.s3_location.bucket_name
prop input_object_key : str
-
Get the first input artifact order key unquote plus
Expand source code
@property def input_object_key(self) -> str: """Get the first input artifact order key unquote plus""" return self.data.input_artifacts[0].location.s3_location.object_key
prop user_parameters : str | None
-
Action configuration user parameters
Expand source code
@property def user_parameters(self) -> str | None: """Action configuration user parameters""" return self.data.action_configuration.configuration.user_parameters
Methods
def find_input_artifact(self, artifact_name: str) ‑> CodePipelineArtifact | None
-
Find an input artifact by artifact name
Parameters
artifact_name
:str
- The name of the input artifact to look for
Returns
CodePipelineArtifact, None
- Matching CodePipelineArtifact if found
def find_output_artifact(self, artifact_name: str) ‑> CodePipelineArtifact | None
-
Find an output artifact by artifact name
Parameters
artifact_name
:str
- The name of the output artifact to look for
Returns
CodePipelineArtifact, None
- Matching CodePipelineArtifact if found
def get_artifact(self, artifact_name: str, filename: str | None = None) ‑> str | None
-
Get a file within an artifact zip on s3
Parameters
artifact_name
:str
- Name of the S3 artifact to download
filename
:str
- The file name within the artifact zip to extract as a string If None, this will return the raw object body.
Returns
str, None
- Returns the contents file contents as a string
def put_artifact(self, artifact_name: str, body: Any, content_type: str) ‑> None
-
Writes an object to an s3 output artifact.
Parameters
artifact_name
:str
- Name of the S3 artifact to upload
body
:Any
- The data to be written. Binary files should use io.BytesIO.
content_type
:str
- The content type of the data.
Returns
None
def setup_s3_client(self)
-
Creates an S3 client
Uses the credentials passed in the event by CodePipeline. These credentials can be used to access the artifact bucket.
Returns
BaseClient
- An S3 client with the appropriate credentials
Inherited members
class ConnectContactFlowEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Amazon Connect contact flow event
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class ConnectContactFlowEvent(DictWrapper): """Amazon Connect contact flow event Documentation: ------------- - https://docs.aws.amazon.com/connect/latest/adminguide/connect-lambda-functions.html """ @property def contact_data(self) -> ConnectContactFlowData: """This is always passed by Amazon Connect for every contact. Some parameters are optional.""" return ConnectContactFlowData(self["Details"]["ContactData"]) @property def parameters(self) -> dict[str, str]: """These are parameters specific to this call that were defined when you created the Lambda function.""" return self["Details"]["Parameters"]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop contact_data : ConnectContactFlowData
-
This is always passed by Amazon Connect for every contact. Some parameters are optional.
Expand source code
@property def contact_data(self) -> ConnectContactFlowData: """This is always passed by Amazon Connect for every contact. Some parameters are optional.""" return ConnectContactFlowData(self["Details"]["ContactData"])
prop parameters : dict[str, str]
-
These are parameters specific to this call that were defined when you created the Lambda function.
Expand source code
@property def parameters(self) -> dict[str, str]: """These are parameters specific to this call that were defined when you created the Lambda function.""" return self["Details"]["Parameters"]
Inherited members
class DynamoDBStreamEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Dynamo DB Stream Event
Documentation:
Example
Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.
from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): for record in event.records: # {"N": "123.45"} => Decimal("123.45") key: str = record.dynamodb.keys["id"] print(key)
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class DynamoDBStreamEvent(DictWrapper): """Dynamo DB Stream Event Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html Example ------- **Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.** from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): for record in event.records: # {"N": "123.45"} => Decimal("123.45") key: str = record.dynamodb.keys["id"] print(key) """ @property def records(self) -> Iterator[DynamoDBRecord]: for record in self["Records"]: yield DynamoDBRecord(record)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop records : Iterator[DynamoDBRecord]
-
Expand source code
@property def records(self) -> Iterator[DynamoDBRecord]: for record in self["Records"]: yield DynamoDBRecord(record)
Inherited members
class EventBridgeEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Amazon EventBridge Event
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class EventBridgeEvent(DictWrapper): """Amazon EventBridge Event Documentation: -------------- - https://docs.aws.amazon.com/eventbridge/latest/userguide/aws-events.html """ @property def get_id(self) -> str: """A unique value is generated for every event. This can be helpful in tracing events as they move through rules to targets, and are processed.""" # Note: this name conflicts with existing python builtins return self["id"] @property def version(self) -> str: """By default, this is set to 0 (zero) in all events.""" return self["version"] @property def account(self) -> str: """The 12-digit number identifying an AWS account.""" return self["account"] @property def time(self) -> str: """The event timestamp, which can be specified by the service originating the event. If the event spans a time interval, the service might choose to report the start time, so this value can be noticeably before the time the event is actually received. """ return self["time"] @property def region(self) -> str: """Identifies the AWS region where the event originated.""" return self["region"] @property def resources(self) -> list[str]: """This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.""" return self["resources"] @property def source(self) -> str: """Identifies the service that sourced the event. All events sourced from within AWS begin with "aws." """ return self["source"] @property def detail_type(self) -> str: """Identifies, in combination with the source field, the fields and values that appear in the detail field.""" return self["detail-type"] @property def detail(self) -> dict[str, Any]: """A JSON object, whose content is at the discretion of the service originating the event.""" return self["detail"] @property def replay_name(self) -> str | None: """Identifies whether the event is being replayed and what is the name of the replay.""" return self["replay-name"]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Subclasses
Instance variables
prop account : str
-
The 12-digit number identifying an AWS account.
Expand source code
@property def account(self) -> str: """The 12-digit number identifying an AWS account.""" return self["account"]
prop detail : dict[str, Any]
-
A JSON object, whose content is at the discretion of the service originating the event.
Expand source code
@property def detail(self) -> dict[str, Any]: """A JSON object, whose content is at the discretion of the service originating the event.""" return self["detail"]
prop detail_type : str
-
Identifies, in combination with the source field, the fields and values that appear in the detail field.
Expand source code
@property def detail_type(self) -> str: """Identifies, in combination with the source field, the fields and values that appear in the detail field.""" return self["detail-type"]
prop get_id : str
-
A unique value is generated for every event. This can be helpful in tracing events as they move through rules to targets, and are processed.
Expand source code
@property def get_id(self) -> str: """A unique value is generated for every event. This can be helpful in tracing events as they move through rules to targets, and are processed.""" # Note: this name conflicts with existing python builtins return self["id"]
prop region : str
-
Identifies the AWS region where the event originated.
Expand source code
@property def region(self) -> str: """Identifies the AWS region where the event originated.""" return self["region"]
prop replay_name : str | None
-
Identifies whether the event is being replayed and what is the name of the replay.
Expand source code
@property def replay_name(self) -> str | None: """Identifies whether the event is being replayed and what is the name of the replay.""" return self["replay-name"]
prop resources : list[str]
-
This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.
Expand source code
@property def resources(self) -> list[str]: """This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.""" return self["resources"]
prop source : str
-
Identifies the service that sourced the event. All events sourced from within AWS begin with "aws."
Expand source code
@property def source(self) -> str: """Identifies the service that sourced the event. All events sourced from within AWS begin with "aws." """ return self["source"]
prop time : str
-
The event timestamp, which can be specified by the service originating the event.
If the event spans a time interval, the service might choose to report the start time, so this value can be noticeably before the time the event is actually received.
Expand source code
@property def time(self) -> str: """The event timestamp, which can be specified by the service originating the event. If the event spans a time interval, the service might choose to report the start time, so this value can be noticeably before the time the event is actually received. """ return self["time"]
prop version : str
-
By default, this is set to 0 (zero) in all events.
Expand source code
@property def version(self) -> str: """By default, this is set to 0 (zero) in all events.""" return self["version"]
Inherited members
class KafkaEvent (data: dict[str, Any])
-
Self-managed or MSK Apache Kafka event trigger Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KafkaEvent(DictWrapper): """Self-managed or MSK Apache Kafka event trigger Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html """ def __init__(self, data: dict[str, Any]): super().__init__(data) self._records: Iterator[KafkaEventRecord] | None = None @property def event_source(self) -> str: """The AWS service from which the Kafka event record originated.""" return self["eventSource"] @property def event_source_arn(self) -> str | None: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn") @property def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" return self["bootstrapServers"] @property def decoded_bootstrap_servers(self) -> list[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",") @property def records(self) -> Iterator[KafkaEventRecord]: """The Kafka records.""" for chunk in self["records"].values(): for record in chunk: yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer) @property def record(self) -> KafkaEventRecord: """ Returns the next Kafka record using an iterator. Returns ------- KafkaEventRecord The next Kafka record. Raises ------ StopIteration If there are no more records available. """ if self._records is None: self._records = self.records return next(self._records)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop bootstrap_servers : str
-
The Kafka bootstrap URL.
Expand source code
@property def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" return self["bootstrapServers"]
prop decoded_bootstrap_servers : list[str]
-
The decoded Kafka bootstrap URL.
Expand source code
@property def decoded_bootstrap_servers(self) -> list[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",")
prop event_source : str
-
The AWS service from which the Kafka event record originated.
Expand source code
@property def event_source(self) -> str: """The AWS service from which the Kafka event record originated.""" return self["eventSource"]
prop event_source_arn : str | None
-
The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.
Expand source code
@property def event_source_arn(self) -> str | None: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn")
prop record : KafkaEventRecord
-
Returns the next Kafka record using an iterator.
Returns
KafkaEventRecord
- The next Kafka record.
Raises
StopIteration
- If there are no more records available.
Expand source code
@property def record(self) -> KafkaEventRecord: """ Returns the next Kafka record using an iterator. Returns ------- KafkaEventRecord The next Kafka record. Raises ------ StopIteration If there are no more records available. """ if self._records is None: self._records = self.records return next(self._records)
prop records : Iterator[KafkaEventRecord]
-
The Kafka records.
Expand source code
@property def records(self) -> Iterator[KafkaEventRecord]: """The Kafka records.""" for chunk in self["records"].values(): for record in chunk: yield KafkaEventRecord(data=record, json_deserializer=self._json_deserializer)
Inherited members
class KinesisFirehoseDataTransformationRecord (record_id: str, result: "Literal['Ok', 'Dropped', 'ProcessingFailed']" = 'Ok', data: str = '', metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None, json_serializer: Callable = <function dumps>, json_deserializer: Callable = <function loads>)
-
Record in Kinesis Data Firehose response object.
Parameters
record_id
:str
- uniquely identifies this record within the current batch
result
:Literal["Ok", "Dropped", "ProcessingFailed"]
- record data transformation status, whether it succeeded, should be dropped, or failed.
data
:str
-
base64-encoded payload, by default empty string.
Use
data_from_text
ordata_from_json
methods to convert data if needed. metadata
:KinesisFirehoseDataTransformationRecordMetadata | None
-
Metadata associated with this record; can contain partition keys.
See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
json_serializer
:Callable
- function to serialize
obj
to a JSON formattedstr
, by default json.dumps json_deserializer
:Callable
- function to deserialize
str
,bytes
, bytearraycontaining a JSON document to a Python
obj`, by default json.loads
Documentation:
Expand source code
@dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationRecord: """Record in Kinesis Data Firehose response object. Parameters ---------- record_id: str uniquely identifies this record within the current batch result: Literal["Ok", "Dropped", "ProcessingFailed"] record data transformation status, whether it succeeded, should be dropped, or failed. data: str base64-encoded payload, by default empty string. Use `data_from_text` or `data_from_json` methods to convert data if needed. metadata: KinesisFirehoseDataTransformationRecordMetadata | None Metadata associated with this record; can contain partition keys. See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html json_serializer: Callable function to serialize `obj` to a JSON formatted `str`, by default json.dumps json_deserializer: Callable function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`, by default json.loads Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html """ _valid_result_types: ClassVar[tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed") record_id: str result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok" data: str = "" metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None json_serializer: Callable = json.dumps json_deserializer: Callable = json.loads def asdict(self) -> dict: if self.result not in self._valid_result_types: warnings.warn( stacklevel=1, message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"', ) record: dict[str, Any] = { "recordId": self.record_id, "result": self.result, "data": self.data, } if self.metadata: record["metadata"] = self.metadata.asdict() return record @property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" if not self.data: return b"" return base64.b64decode(self.data) @property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" if not self.data: return "" return self.data_as_bytes.decode("utf-8") @cached_property def data_as_json(self) -> dict: """Decoded base64-encoded data loaded to json""" if not self.data: return {} return self.json_deserializer(self.data_as_text)
Class variables
var data : str
var metadata : KinesisFirehoseDataTransformationRecordMetadata | None
var record_id : str
var result : Literal['Ok', 'Dropped', 'ProcessingFailed']
Instance variables
prop data_as_bytes : bytes
-
Decoded base64-encoded data as bytes
Expand source code
@property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" if not self.data: return b"" return base64.b64decode(self.data)
var data_as_json
-
Decoded base64-encoded data loaded to json
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
prop data_as_text : str
-
Decoded base64-encoded data as text
Expand source code
@property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" if not self.data: return "" return self.data_as_bytes.decode("utf-8")
Methods
def asdict(self) ‑> dict
def json_deserializer(s, *, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw)
-
Deserialize
s
(astr
,bytes
orbytearray
instance containing a JSON document) to a Python object.object_hook
is an optional function that will be called with the result of any object literal decode (adict
). The return value ofobject_hook
will be used instead of thedict
. This feature can be used to implement custom decoders (e.g. JSON-RPC class hinting).object_pairs_hook
is an optional function that will be called with the result of any object literal decoded with an ordered list of pairs. The return value ofobject_pairs_hook
will be used instead of thedict
. This feature can be used to implement custom decoders. Ifobject_hook
is also defined, theobject_pairs_hook
takes priority.parse_float
, if specified, will be called with the string of every JSON float to be decoded. By default this is equivalent to float(num_str). This can be used to use another datatype or parser for JSON floats (e.g. decimal.Decimal).parse_int
, if specified, will be called with the string of every JSON int to be decoded. By default this is equivalent to int(num_str). This can be used to use another datatype or parser for JSON integers (e.g. float).parse_constant
, if specified, will be called with one of the following strings: -Infinity, Infinity, NaN. This can be used to raise an exception if invalid JSON numbers are encountered.To use a custom
JSONDecoder
subclass, specify it with thecls
kwarg; otherwiseJSONDecoder
is used. def json_serializer(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=None, sort_keys=False, **kw)
-
Serialize
obj
to a JSON formattedstr
.If
skipkeys
is true thendict
keys that are not basic types (str
,int
,float
,bool
,None
) will be skipped instead of raising aTypeError
.If
ensure_ascii
is false, then the return value can contain non-ASCII characters if they appear in strings contained inobj
. Otherwise, all such characters are escaped in JSON strings.If
check_circular
is false, then the circular reference check for container types will be skipped and a circular reference will result in anRecursionError
(or worse).If
allow_nan
is false, then it will be aValueError
to serialize out of rangefloat
values (nan
,inf
,-inf
) in strict compliance of the JSON specification, instead of using the JavaScript equivalents (NaN
,Infinity
,-Infinity
).If
indent
is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines.None
is the most compact representation.If specified,
separators
should be an(item_separator, key_separator)
tuple. The default is(', ', ': ')
if indent isNone
and(',', ': ')
otherwise. To get the most compact JSON representation, you should specify(',', ':')
to eliminate whitespace.default(obj)
is a function that should return a serializable version of obj or raise TypeError. The default simply raises TypeError.If sort_keys is true (default:
False
), then the output of dictionaries will be sorted by key.To use a custom
JSONEncoder
subclass (e.g. one that overrides the.default()
method to serialize additional types), specify it with thecls
kwarg; otherwiseJSONEncoder
is used.
class KinesisFirehoseDataTransformationRecordMetadata (partition_keys: dict[str, str] = <factory>)
-
Metadata in Firehose Data Transform Record.
Parameters
partition_keys
:dict[str, str]
- A dict of partition keys/value in string format, e.g.
{"year":"2023","month":"09"}
Documentation:
Expand source code
@dataclass(repr=False, order=False, frozen=True) class KinesisFirehoseDataTransformationRecordMetadata: """ Metadata in Firehose Data Transform Record. Parameters ---------- partition_keys: dict[str, str] A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}` Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ partition_keys: dict[str, str] = field(default_factory=lambda: {}) def asdict(self) -> dict: if self.partition_keys is not None: return {"partitionKeys": self.partition_keys} return {}
Class variables
var partition_keys : dict[str, str]
Methods
def asdict(self) ‑> dict
class KinesisFirehoseDataTransformationResponse (records: list[KinesisFirehoseDataTransformationRecord] = <factory>)
-
Kinesis Data Firehose response object
Documentation:
Parameters
records
:list[KinesisFirehoseResponseRecord]
- records of Kinesis Data Firehose response object,
optional parameter at start. can be added later using
add_record
function.
Examples
Transforming data records
from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() for record in firehose_event.records: payload = record.data_as_text # base64 decoded data as str ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) result.add_record(processed_record) # return transformed records return result.asdict()
Expand source code
@dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationResponse: """Kinesis Data Firehose response object Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html Parameters ---------- records : list[KinesisFirehoseResponseRecord] records of Kinesis Data Firehose response object, optional parameter at start. can be added later using `add_record` function. Examples -------- **Transforming data records** ```python from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() for record in firehose_event.records: payload = record.data_as_text # base64 decoded data as str ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) result.add_record(processed_record) # return transformed records return result.asdict() ``` """ records: list[KinesisFirehoseDataTransformationRecord] = field(default_factory=list) def add_record(self, record: KinesisFirehoseDataTransformationRecord): self.records.append(record) def asdict(self) -> dict: if not self.records: raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response") return {"records": [record.asdict() for record in self.records]}
Class variables
var records : list[KinesisFirehoseDataTransformationRecord]
Methods
def add_record(self, record: KinesisFirehoseDataTransformationRecord)
def asdict(self) ‑> dict
class KinesisFirehoseEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Kinesis Data Firehose event
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisFirehoseEvent(DictWrapper): """Kinesis Data Firehose event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html """ @property def invocation_id(self) -> str: """Unique ID for for Lambda invocation""" return self["invocationId"] @property def delivery_stream_arn(self) -> str: """ARN of the Firehose Data Firehose Delivery Stream""" return self["deliveryStreamArn"] @property def source_kinesis_stream_arn(self) -> str | None: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn") @property def region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["region"] @property def records(self) -> Iterator[KinesisFirehoseRecord]: for record in self["records"]: yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop delivery_stream_arn : str
-
ARN of the Firehose Data Firehose Delivery Stream
Expand source code
@property def delivery_stream_arn(self) -> str: """ARN of the Firehose Data Firehose Delivery Stream""" return self["deliveryStreamArn"]
prop invocation_id : str
-
Unique ID for for Lambda invocation
Expand source code
@property def invocation_id(self) -> str: """Unique ID for for Lambda invocation""" return self["invocationId"]
prop records : Iterator[KinesisFirehoseRecord]
-
Expand source code
@property def records(self) -> Iterator[KinesisFirehoseRecord]: for record in self["records"]: yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)
prop region : str
-
AWS region where the event originated eg: us-east-1
Expand source code
@property def region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["region"]
prop source_kinesis_stream_arn : str | None
-
ARN of the Kinesis Stream; present only when Kinesis Stream is source
Expand source code
@property def source_kinesis_stream_arn(self) -> str | None: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn")
Inherited members
class KinesisStreamEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Kinesis stream event
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class KinesisStreamEvent(DictWrapper): """Kinesis stream event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html """ @property def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop records : Iterator[KinesisStreamRecord]
-
Expand source code
@property def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record)
Inherited members
class LambdaFunctionUrlEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
AWS Lambda Function URL event
Notes:
Lambda Function URL follows the API Gateway HTTP APIs Payload Format Version 2.0.
Keys related to API Gateway features not available in Function URL use a sentinel value (e.g.
routeKey
,stage
).Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html - https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html#urls-payloads
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class LambdaFunctionUrlEvent(APIGatewayProxyEventV2): """AWS Lambda Function URL event Notes: ----- Lambda Function URL follows the API Gateway HTTP APIs Payload Format Version 2.0. Keys related to API Gateway features not available in Function URL use a sentinel value (e.g.`routeKey`, `stage`). Documentation: - https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html - https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html#urls-payloads """ pass
Ancestors
- APIGatewayProxyEventV2
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Inherited members
class S3BatchOperationEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Amazon S3BatchOperation Event
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class S3BatchOperationEvent(DictWrapper): """Amazon S3BatchOperation Event Documentation: -------------- - https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html """ @property def invocation_id(self) -> str: """Get the identifier of the invocation request""" return self["invocationId"] @property def invocation_schema_version(self) -> Literal["1.0", "2.0"]: """ Get the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function. Either '1.0' or '2.0'. """ return self["invocationSchemaVersion"] @property def tasks(self) -> Iterator[S3BatchOperationTask]: """Get s3 batch operation tasks""" for task in self["tasks"]: yield S3BatchOperationTask(task) @property def task(self) -> S3BatchOperationTask: """Get the first s3 batch operation task""" return next(self.tasks) @property def job(self) -> S3BatchOperationJob: """Get the s3 batch operation job""" return S3BatchOperationJob(self["job"])
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop invocation_id : str
-
Get the identifier of the invocation request
Expand source code
@property def invocation_id(self) -> str: """Get the identifier of the invocation request""" return self["invocationId"]
prop invocation_schema_version : Literal['1.0', '2.0']
-
Get the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function. Either '1.0' or '2.0'.
Expand source code
@property def invocation_schema_version(self) -> Literal["1.0", "2.0"]: """ Get the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function. Either '1.0' or '2.0'. """ return self["invocationSchemaVersion"]
prop job : S3BatchOperationJob
-
Get the s3 batch operation job
Expand source code
@property def job(self) -> S3BatchOperationJob: """Get the s3 batch operation job""" return S3BatchOperationJob(self["job"])
prop task : S3BatchOperationTask
-
Get the first s3 batch operation task
Expand source code
@property def task(self) -> S3BatchOperationTask: """Get the first s3 batch operation task""" return next(self.tasks)
prop tasks : Iterator[S3BatchOperationTask]
-
Get s3 batch operation tasks
Expand source code
@property def tasks(self) -> Iterator[S3BatchOperationTask]: """Get s3 batch operation tasks""" for task in self["tasks"]: yield S3BatchOperationTask(task)
Inherited members
class S3BatchOperationResponse (invocation_schema_version: str, invocation_id: str, treat_missing_keys_as: RESULT_CODE_TYPE = 'Succeeded', results: list[S3BatchOperationResponseRecord] = <factory>)
-
S3 Batch Operations response object
Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions
- https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion
Parameters
invocation_schema_version
:str
- Specifies the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event.
invocation_id
:str
- The identifier of the invocation request. This must be copied from the event.
treat_missing_keys_as
:Literal["Succeeded", "TemporaryFailure", "PermanentFailure"]
- Undocumented parameter, defaults to "Succeeded"
results
:list[S3BatchOperationResult]
- Results of each S3 Batch Operations task,
optional parameter at start. Can be added later using
add_result
function.
Examples
S3 Batch Operations
import boto3 from botocore.exceptions import ClientError from aws_lambda_powertools.utilities.data_classes import ( S3BatchOperationEvent, S3BatchOperationResponse, event_source ) from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=S3BatchOperationEvent) def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): response = S3BatchOperationResponse( event.invocation_schema_version, event.invocation_id, "PermanentFailure" ) result = None task = event.task src_key: str = task.s3_key src_bucket: str = task.s3_bucket s3 = boto3.client("s3", region_name='us-east-1') try: dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") except ClientError as e: error_code = e.response['Error']['Code'] error_message = e.response['Error']['Message'] if error_code == 'RequestTimeout': result = task.build_task_batch_response("TemporaryFailure", "Timeout - trying again") else: result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") except Exception as e: result = task.build_task_batch_response("PermanentFailure", str(e)) finally: response.add_result(result) return response.asdict()
Expand source code
@dataclass(repr=False, order=False) class S3BatchOperationResponse: """S3 Batch Operations response object Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html - https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions - https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion Parameters ---------- invocation_schema_version : str Specifies the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event. invocation_id : str The identifier of the invocation request. This must be copied from the event. treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] Undocumented parameter, defaults to "Succeeded" results : list[S3BatchOperationResult] Results of each S3 Batch Operations task, optional parameter at start. Can be added later using `add_result` function. Examples -------- **S3 Batch Operations** ```python import boto3 from botocore.exceptions import ClientError from aws_lambda_powertools.utilities.data_classes import ( S3BatchOperationEvent, S3BatchOperationResponse, event_source ) from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=S3BatchOperationEvent) def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): response = S3BatchOperationResponse( event.invocation_schema_version, event.invocation_id, "PermanentFailure" ) result = None task = event.task src_key: str = task.s3_key src_bucket: str = task.s3_bucket s3 = boto3.client("s3", region_name='us-east-1') try: dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") except ClientError as e: error_code = e.response['Error']['Code'] error_message = e.response['Error']['Message'] if error_code == 'RequestTimeout': result = task.build_task_batch_response("TemporaryFailure", "Timeout - trying again") else: result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") except Exception as e: result = task.build_task_batch_response("PermanentFailure", str(e)) finally: response.add_result(result) return response.asdict() ``` """ invocation_schema_version: str invocation_id: str treat_missing_keys_as: RESULT_CODE_TYPE = "Succeeded" results: list[S3BatchOperationResponseRecord] = field(default_factory=list) def __post_init__(self): if self.treat_missing_keys_as not in VALID_RESULT_CODES: warnings.warn( stacklevel=2, message=f"The value {self.treat_missing_keys_as} is not valid for treat_missing_keys_as, " f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.", ) def add_result(self, result: S3BatchOperationResponseRecord): self.results.append(result) def asdict(self) -> dict: result_count = len(self.results) if result_count != 1: raise ValueError(f"Response must have exactly one result, but got {result_count}") return { "invocationSchemaVersion": self.invocation_schema_version, "treatMissingKeysAs": self.treat_missing_keys_as, "invocationId": self.invocation_id, "results": [result.asdict() for result in self.results], }
Class variables
var invocation_id : str
var invocation_schema_version : str
var results : list[S3BatchOperationResponseRecord]
var treat_missing_keys_as : Literal['Succeeded', 'TemporaryFailure', 'PermanentFailure']
Methods
def add_result(self, result: S3BatchOperationResponseRecord)
def asdict(self) ‑> dict
class S3BatchOperationResponseRecord (task_id: str, result_code: RESULT_CODE_TYPE, result_string: str | None = None)
-
S3BatchOperationResponseRecord(task_id: 'str', result_code: 'RESULT_CODE_TYPE', result_string: 'str | None' = None)
Expand source code
@dataclass(repr=False, order=False) class S3BatchOperationResponseRecord: task_id: str result_code: RESULT_CODE_TYPE result_string: str | None = None def asdict(self) -> dict[str, Any]: if self.result_code not in VALID_RESULT_CODES: warnings.warn( stacklevel=2, message=f"The resultCode {self.result_code} is not valid. " f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.", ) return { "taskId": self.task_id, "resultCode": self.result_code, "resultString": self.result_string, }
Class variables
var result_code : Literal['Succeeded', 'TemporaryFailure', 'PermanentFailure']
var result_string : str | None
var task_id : str
Methods
def asdict(self) ‑> dict[str, typing.Any]
class S3Event (data: dict[str, Any], json_deserializer: Callable | None = None)
-
S3 event notification
Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
- https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
- https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class S3Event(DictWrapper): """S3 event notification Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html - https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html - https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html """ @property def records(self) -> Iterator[S3EventRecord]: for record in self["Records"]: yield S3EventRecord(record) @property def record(self) -> S3EventRecord: """Get the first s3 event record""" return next(self.records) @property def bucket_name(self) -> str: """Get the bucket name for the first s3 event record""" return self["Records"][0]["s3"]["bucket"]["name"] @property def object_key(self) -> str: """Get the object key for the first s3 event record and unquote plus""" return unquote_plus(self["Records"][0]["s3"]["object"]["key"])
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop bucket_name : str
-
Get the bucket name for the first s3 event record
Expand source code
@property def bucket_name(self) -> str: """Get the bucket name for the first s3 event record""" return self["Records"][0]["s3"]["bucket"]["name"]
prop object_key : str
-
Get the object key for the first s3 event record and unquote plus
Expand source code
@property def object_key(self) -> str: """Get the object key for the first s3 event record and unquote plus""" return unquote_plus(self["Records"][0]["s3"]["object"]["key"])
prop record : S3EventRecord
-
Get the first s3 event record
Expand source code
@property def record(self) -> S3EventRecord: """Get the first s3 event record""" return next(self.records)
prop records : Iterator[S3EventRecord]
-
Expand source code
@property def records(self) -> Iterator[S3EventRecord]: for record in self["Records"]: yield S3EventRecord(record)
Inherited members
class S3EventBridgeNotificationEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Amazon S3EventBridge Event
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class S3EventBridgeNotificationEvent(EventBridgeEvent): """Amazon S3EventBridge Event Documentation: -------------- - https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html """ @property def detail(self) -> S3EventBridgeNotificationDetail: # type: ignore[override] """S3 notification details""" return S3EventBridgeNotificationDetail(self["detail"])
Ancestors
- EventBridgeEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop detail : S3EventBridgeNotificationDetail
-
S3 notification details
Expand source code
@property def detail(self) -> S3EventBridgeNotificationDetail: # type: ignore[override] """S3 notification details""" return S3EventBridgeNotificationDetail(self["detail"])
Inherited members
class SESEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Amazon SES to receive message event trigger
NOTE: There is a 30-second timeout on RequestResponse invocations.
Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/services-ses.html
- https://docs.aws.amazon.com/ses/latest/DeveloperGuide/receiving-email-action-lambda.html
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class SESEvent(DictWrapper): """Amazon SES to receive message event trigger NOTE: There is a 30-second timeout on RequestResponse invocations. Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-ses.html - https://docs.aws.amazon.com/ses/latest/DeveloperGuide/receiving-email-action-lambda.html """ @property def records(self) -> Iterator[SESEventRecord]: for record in self["Records"]: yield SESEventRecord(record) @property def record(self) -> SESEventRecord: return next(self.records) @property def mail(self) -> SESMail: return self.record.ses.mail @property def receipt(self) -> SESReceipt: return self.record.ses.receipt
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop mail : SESMail
-
Expand source code
@property def mail(self) -> SESMail: return self.record.ses.mail
prop receipt : SESReceipt
-
Expand source code
@property def receipt(self) -> SESReceipt: return self.record.ses.receipt
prop record : SESEventRecord
-
Expand source code
@property def record(self) -> SESEventRecord: return next(self.records)
prop records : Iterator[SESEventRecord]
-
Expand source code
@property def records(self) -> Iterator[SESEventRecord]: for record in self["Records"]: yield SESEventRecord(record)
Inherited members
class SNSEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
SNS Event
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class SNSEvent(DictWrapper): """SNS Event Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html """ @property def records(self) -> Iterator[SNSEventRecord]: for record in self["Records"]: yield SNSEventRecord(record) @property def record(self) -> SNSEventRecord: """Return the first SNS event record""" return next(self.records) @property def sns_message(self) -> str: """Return the message for the first sns event record""" return self.record.sns.message
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop record : SNSEventRecord
-
Return the first SNS event record
Expand source code
@property def record(self) -> SNSEventRecord: """Return the first SNS event record""" return next(self.records)
prop records : Iterator[SNSEventRecord]
-
Expand source code
@property def records(self) -> Iterator[SNSEventRecord]: for record in self["Records"]: yield SNSEventRecord(record)
prop sns_message : str
-
Return the message for the first sns event record
Expand source code
@property def sns_message(self) -> str: """Return the message for the first sns event record""" return self.record.sns.message
Inherited members
class SQSEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
SQS Event
Documentation:
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class SQSEvent(DictWrapper): """SQS Event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html """ @property def records(self) -> Iterator[SQSRecord]: for record in self["Records"]: yield SQSRecord(data=record, json_deserializer=self._json_deserializer)
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop records : Iterator[SQSRecord]
-
Expand source code
@property def records(self) -> Iterator[SQSRecord]: for record in self["Records"]: yield SQSRecord(data=record, json_deserializer=self._json_deserializer)
Inherited members
class SecretsManagerEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class SecretsManagerEvent(DictWrapper): @property def secret_id(self) -> str: """SecretId: The secret ARN or identifier""" return self["SecretId"] @property def client_request_token(self) -> str: """ClientRequestToken: The ClientRequestToken associated with the secret version""" return self["ClientRequestToken"] @property def version_id(self) -> str: """Alias to ClientRequestToken to get token associated to version""" return self["ClientRequestToken"] @property def step(self) -> Literal["createSecret", "setSecret", "testSecret", "finishSecret"]: """Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)""" return self["Step"]
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop client_request_token : str
-
ClientRequestToken: The ClientRequestToken associated with the secret version
Expand source code
@property def client_request_token(self) -> str: """ClientRequestToken: The ClientRequestToken associated with the secret version""" return self["ClientRequestToken"]
prop secret_id : str
-
SecretId: The secret ARN or identifier
Expand source code
@property def secret_id(self) -> str: """SecretId: The secret ARN or identifier""" return self["SecretId"]
prop step : Literal['createSecret', 'setSecret', 'testSecret', 'finishSecret']
-
Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)
Expand source code
@property def step(self) -> Literal["createSecret", "setSecret", "testSecret", "finishSecret"]: """Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)""" return self["Step"]
prop version_id : str
-
Alias to ClientRequestToken to get token associated to version
Expand source code
@property def version_id(self) -> str: """Alias to ClientRequestToken to get token associated to version""" return self["ClientRequestToken"]
Inherited members
class VPCLatticeEvent (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class VPCLatticeEvent(VPCLatticeEventBase): @property def raw_path(self) -> str: """The raw VPC Lattice request path.""" return self["raw_path"] @property def is_base64_encoded(self) -> bool: """A boolean flag to indicate if the applicable request payload is Base64-encode""" return self["is_base64_encoded"] # VPCLattice event has no path field # Added here for consistency with the BaseProxyEvent class @property def path(self) -> str: return self["raw_path"] @property def query_string_parameters(self) -> dict[str, str]: """The request query string parameters.""" return self["query_string_parameters"] @cached_property def resolved_headers_field(self) -> dict[str, Any]: return CaseInsensitiveDict((k, v.split(",") if "," in v else v) for k, v in self.headers.items())
Ancestors
- VPCLatticeEventBase
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop is_base64_encoded : bool
-
A boolean flag to indicate if the applicable request payload is Base64-encode
Expand source code
@property def is_base64_encoded(self) -> bool: """A boolean flag to indicate if the applicable request payload is Base64-encode""" return self["is_base64_encoded"]
prop path : str
-
Expand source code
@property def path(self) -> str: return self["raw_path"]
prop query_string_parameters : dict[str, str]
-
The request query string parameters.
Expand source code
@property def query_string_parameters(self) -> dict[str, str]: """The request query string parameters.""" return self["query_string_parameters"]
prop raw_path : str
-
The raw VPC Lattice request path.
Expand source code
@property def raw_path(self) -> str: """The raw VPC Lattice request path.""" return self["raw_path"]
Inherited members
class VPCLatticeEventV2 (data: dict[str, Any], json_deserializer: Callable | None = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class VPCLatticeEventV2(VPCLatticeEventBase): @property def version(self) -> str: """The VPC Lattice v2 Event version""" return self["version"] @property def is_base64_encoded(self) -> bool | None: """A boolean flag to indicate if the applicable request payload is Base64-encode""" return self.get("isBase64Encoded") @property def path(self) -> str: """The VPC Lattice v2 Event path""" return self["path"] @property def request_context(self) -> vpcLatticeEventV2RequestContext: """The VPC Lattice v2 Event request context.""" return vpcLatticeEventV2RequestContext(self["requestContext"]) @cached_property def query_string_parameters(self) -> dict[str, str]: """The request query string parameters. For VPC Lattice V2, the queryStringParameters will contain a dict[str, list[str]] so to keep compatibility with existing utilities, we merge all the values with a comma. """ params = self.get("queryStringParameters") or {} return {k: ",".join(v) for k, v in params.items()} @property def resolved_headers_field(self) -> dict[str, str]: if self.headers is not None: return {key.lower(): value for key, value in self.headers.items()} return {}
Ancestors
- VPCLatticeEventBase
- BaseProxyEvent
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop is_base64_encoded : bool | None
-
A boolean flag to indicate if the applicable request payload is Base64-encode
Expand source code
@property def is_base64_encoded(self) -> bool | None: """A boolean flag to indicate if the applicable request payload is Base64-encode""" return self.get("isBase64Encoded")
prop path : str
-
The VPC Lattice v2 Event path
Expand source code
@property def path(self) -> str: """The VPC Lattice v2 Event path""" return self["path"]
var query_string_parameters
-
The request query string parameters.
For VPC Lattice V2, the queryStringParameters will contain a dict[str, list[str]] so to keep compatibility with existing utilities, we merge all the values with a comma.
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
prop request_context : vpcLatticeEventV2RequestContext
-
The VPC Lattice v2 Event request context.
Expand source code
@property def request_context(self) -> vpcLatticeEventV2RequestContext: """The VPC Lattice v2 Event request context.""" return vpcLatticeEventV2RequestContext(self["requestContext"])
prop version : str
-
The VPC Lattice v2 Event version
Expand source code
@property def version(self) -> str: """The VPC Lattice v2 Event version""" return self["version"]
Inherited members