Module aws_lambda_powertools.utilities.data_classes.s3_batch_operation_event
Classes
class S3BatchOperationEvent (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Amazon S3BatchOperation Event
Documentation:
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class S3BatchOperationEvent(DictWrapper): """Amazon S3BatchOperation Event Documentation: -------------- - https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html """ @property def invocation_id(self) -> str: """Get the identifier of the invocation request""" return self["invocationId"] @property def invocation_schema_version(self) -> Literal["1.0", "2.0"]: """ Get the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function. Either '1.0' or '2.0'. """ return self["invocationSchemaVersion"] @property def tasks(self) -> Iterator[S3BatchOperationTask]: """Get s3 batch operation tasks""" for task in self["tasks"]: yield S3BatchOperationTask(task) @property def task(self) -> S3BatchOperationTask: """Get the first s3 batch operation task""" return next(self.tasks) @property def job(self) -> S3BatchOperationJob: """Get the s3 batch operation job""" return S3BatchOperationJob(self["job"])
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
prop invocation_id : str
-
Get the identifier of the invocation request
Expand source code
@property def invocation_id(self) -> str: """Get the identifier of the invocation request""" return self["invocationId"]
prop invocation_schema_version : Literal['1.0', '2.0']
-
Get the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function. Either '1.0' or '2.0'.
Expand source code
@property def invocation_schema_version(self) -> Literal["1.0", "2.0"]: """ Get the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function. Either '1.0' or '2.0'. """ return self["invocationSchemaVersion"]
prop job : S3BatchOperationJob
-
Get the s3 batch operation job
Expand source code
@property def job(self) -> S3BatchOperationJob: """Get the s3 batch operation job""" return S3BatchOperationJob(self["job"])
prop task : S3BatchOperationTask
-
Get the first s3 batch operation task
Expand source code
@property def task(self) -> S3BatchOperationTask: """Get the first s3 batch operation task""" return next(self.tasks)
prop tasks : Iterator[S3BatchOperationTask]
-
Get s3 batch operation tasks
Expand source code
@property def tasks(self) -> Iterator[S3BatchOperationTask]: """Get s3 batch operation tasks""" for task in self["tasks"]: yield S3BatchOperationTask(task)
Inherited members
class S3BatchOperationJob (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class S3BatchOperationJob(DictWrapper): @property def get_id(self) -> str: # Note: this name conflicts with existing python builtins return self["id"] @property def user_arguments(self) -> Optional[Dict[str, str]]: """Get user arguments provided for this job (only for invocation schema 2.0)""" return self.get("userArguments")
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
prop get_id : str
-
Expand source code
@property def get_id(self) -> str: # Note: this name conflicts with existing python builtins return self["id"]
prop user_arguments : Optional[Dict[str, str]]
-
Get user arguments provided for this job (only for invocation schema 2.0)
Expand source code
@property def user_arguments(self) -> Optional[Dict[str, str]]: """Get user arguments provided for this job (only for invocation schema 2.0)""" return self.get("userArguments")
Inherited members
class S3BatchOperationResponse (invocation_schema_version: str, invocation_id: str, treat_missing_keys_as: Literal['Succeeded', 'TemporaryFailure', 'PermanentFailure'] = 'Succeeded', results: List[S3BatchOperationResponseRecord] = <factory>)
-
S3 Batch Operations response object
Documentation:
- https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions
- https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion
Parameters
invocation_schema_version
:str
- Specifies the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event.
invocation_id
:str
- The identifier of the invocation request. This must be copied from the event.
treat_missing_keys_as
:Literal["Succeeded", "TemporaryFailure", "PermanentFailure"]
- Undocumented parameter, defaults to "Succeeded"
results
:List[S3BatchOperationResult]
- Results of each S3 Batch Operations task,
optional parameter at start. Can be added later using
add_result
function.
Examples
S3 Batch Operations
import boto3 from botocore.exceptions import ClientError from aws_lambda_powertools.utilities.data_classes import ( S3BatchOperationEvent, S3BatchOperationResponse, event_source ) from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=S3BatchOperationEvent) def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): response = S3BatchOperationResponse( event.invocation_schema_version, event.invocation_id, "PermanentFailure" ) result = None task = event.task src_key: str = task.s3_key src_bucket: str = task.s3_bucket s3 = boto3.client("s3", region_name='us-east-1') try: dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") except ClientError as e: error_code = e.response['Error']['Code'] error_message = e.response['Error']['Message'] if error_code == 'RequestTimeout': result = task.build_task_batch_response("TemporaryFailure", "Timeout - trying again") else: result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") except Exception as e: result = task.build_task_batch_response("PermanentFailure", str(e)) finally: response.add_result(result) return response.asdict()
Expand source code
@dataclass(repr=False, order=False) class S3BatchOperationResponse: """S3 Batch Operations response object Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html - https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions - https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion Parameters ---------- invocation_schema_version : str Specifies the schema version for the payload that Batch Operations sends when invoking an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event. invocation_id : str The identifier of the invocation request. This must be copied from the event. treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] Undocumented parameter, defaults to "Succeeded" results : List[S3BatchOperationResult] Results of each S3 Batch Operations task, optional parameter at start. Can be added later using `add_result` function. Examples -------- **S3 Batch Operations** ```python import boto3 from botocore.exceptions import ClientError from aws_lambda_powertools.utilities.data_classes import ( S3BatchOperationEvent, S3BatchOperationResponse, event_source ) from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=S3BatchOperationEvent) def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): response = S3BatchOperationResponse( event.invocation_schema_version, event.invocation_id, "PermanentFailure" ) result = None task = event.task src_key: str = task.s3_key src_bucket: str = task.s3_bucket s3 = boto3.client("s3", region_name='us-east-1') try: dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") except ClientError as e: error_code = e.response['Error']['Code'] error_message = e.response['Error']['Message'] if error_code == 'RequestTimeout': result = task.build_task_batch_response("TemporaryFailure", "Timeout - trying again") else: result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") except Exception as e: result = task.build_task_batch_response("PermanentFailure", str(e)) finally: response.add_result(result) return response.asdict() ``` """ invocation_schema_version: str invocation_id: str treat_missing_keys_as: RESULT_CODE_TYPE = "Succeeded" results: List[S3BatchOperationResponseRecord] = field(default_factory=list) def __post_init__(self): if self.treat_missing_keys_as not in VALID_RESULT_CODES: warnings.warn( stacklevel=2, message=f"The value {self.treat_missing_keys_as} is not valid for treat_missing_keys_as, " f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.", ) def add_result(self, result: S3BatchOperationResponseRecord): self.results.append(result) def asdict(self) -> Dict: result_count = len(self.results) if result_count != 1: raise ValueError(f"Response must have exactly one result, but got {result_count}") return { "invocationSchemaVersion": self.invocation_schema_version, "treatMissingKeysAs": self.treat_missing_keys_as, "invocationId": self.invocation_id, "results": [result.asdict() for result in self.results], }
Class variables
var invocation_id : str
var invocation_schema_version : str
var results : List[S3BatchOperationResponseRecord]
var treat_missing_keys_as : Literal['Succeeded', 'TemporaryFailure', 'PermanentFailure']
Methods
def add_result(self, result: S3BatchOperationResponseRecord)
def asdict(self) ‑> Dict
class S3BatchOperationResponseRecord (task_id: str, result_code: Literal['Succeeded', 'TemporaryFailure', 'PermanentFailure'], result_string: Optional[str] = None)
-
S3BatchOperationResponseRecord(task_id: str, result_code: Literal['Succeeded', 'TemporaryFailure', 'PermanentFailure'], result_string: Optional[str] = None)
Expand source code
@dataclass(repr=False, order=False) class S3BatchOperationResponseRecord: task_id: str result_code: RESULT_CODE_TYPE result_string: Optional[str] = None def asdict(self) -> Dict[str, Any]: if self.result_code not in VALID_RESULT_CODES: warnings.warn( stacklevel=2, message=f"The resultCode {self.result_code} is not valid. " f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.", ) return { "taskId": self.task_id, "resultCode": self.result_code, "resultString": self.result_string, }
Class variables
var result_code : Literal['Succeeded', 'TemporaryFailure', 'PermanentFailure']
var result_string : Optional[str]
var task_id : str
Methods
def asdict(self) ‑> Dict[str, Any]
class S3BatchOperationTask (data: Dict[str, Any], json_deserializer: Optional[Callable] = None)
-
Provides a single read only access to a wrapper dict
Parameters
data
:Dict[str, Any]
- Lambda Event Source Event payload
json_deserializer
:Callable
, optional- function to deserialize
str
,bytes
,bytearray
containing a JSON document to a Pythonobj
, by default json.loads
Expand source code
class S3BatchOperationTask(DictWrapper): @property def task_id(self) -> str: """Get the task id""" return self["taskId"] @property def s3_key(self) -> str: """Get the object key using unquote_plus""" return unquote_plus(self["s3Key"]) @property def s3_version_id(self) -> Optional[str]: """Object version if bucket is versioning-enabled, otherwise null""" return self.get("s3VersionId") @property def s3_bucket_arn(self) -> Optional[str]: """Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')""" return self.get("s3BucketArn") @property def s3_bucket(self) -> str: """ Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0') or from 's3BucketArn' (invocationSchemaVersion '1.0') """ if self.s3_bucket_arn: return self.s3_bucket_arn.split(":::")[-1] return self["s3Bucket"] def build_task_batch_response( self, result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded", result_string: str = "", ) -> S3BatchOperationResponseRecord: """Create a S3BatchOperationResponseRecord directly using the task_id and given values Parameters ---------- result_code : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded" task result, supported value: "Succeeded", "TemporaryFailure", "PermanentFailure" result_string : str string to identify in the report """ return S3BatchOperationResponseRecord( task_id=self.task_id, result_code=result_code, result_string=result_string, )
Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Instance variables
prop s3_bucket : str
-
Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0') or from 's3BucketArn' (invocationSchemaVersion '1.0')
Expand source code
@property def s3_bucket(self) -> str: """ Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0') or from 's3BucketArn' (invocationSchemaVersion '1.0') """ if self.s3_bucket_arn: return self.s3_bucket_arn.split(":::")[-1] return self["s3Bucket"]
prop s3_bucket_arn : Optional[str]
-
Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')
Expand source code
@property def s3_bucket_arn(self) -> Optional[str]: """Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')""" return self.get("s3BucketArn")
prop s3_key : str
-
Get the object key using unquote_plus
Expand source code
@property def s3_key(self) -> str: """Get the object key using unquote_plus""" return unquote_plus(self["s3Key"])
prop s3_version_id : Optional[str]
-
Object version if bucket is versioning-enabled, otherwise null
Expand source code
@property def s3_version_id(self) -> Optional[str]: """Object version if bucket is versioning-enabled, otherwise null""" return self.get("s3VersionId")
prop task_id : str
-
Get the task id
Expand source code
@property def task_id(self) -> str: """Get the task id""" return self["taskId"]
Methods
def build_task_batch_response(self, result_code: Literal['Succeeded', 'TemporaryFailure', 'PermanentFailure'] = 'Succeeded', result_string: str = '') ‑> S3BatchOperationResponseRecord
-
Create a S3BatchOperationResponseRecord directly using the task_id and given values
Parameters
result_code
:Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded"
- task result, supported value: "Succeeded", "TemporaryFailure", "PermanentFailure"
result_string
:str
- string to identify in the report
Inherited members