All examples shared in this documentation are available within the project repository.
You might have events that contains encoded JSON payloads as string, base64, or even in compressed format. It is a common use case to decode and extract them partially or fully as part of your Lambda function invocation.
Powertools also have utilities like validation, idempotency, or feature flags where you might need to extract a portion of your data before using them.
Terminology
Envelope is the terminology we use for the JMESPath expression to extract your JSON object from your data input. We might use those two terms interchangeably.
You can use the extract_data_from_envelope function with any JMESPath expression.
Tip
Another common use case is to fetch deeply nested data, filter, flatten, and more.
1 2 3 4 5 6 7 8 9101112
fromaws_lambda_powertools.utilities.jmespath_utilsimportextract_data_from_envelopefromaws_lambda_powertools.utilities.typingimportLambdaContextdefhandler(event:dict,context:LambdaContext)->dict:payload=extract_data_from_envelope(data=event,envelope="powertools_json(body)")customer_id=payload.get("customerId")# now deserialized# also works for fetching and flattening deeply nested datasome_data=extract_data_from_envelope(data=event,envelope="deeply_nested[*].some_data[]")return{"customer_id":customer_id,"message":"success","context":some_data,"statusCode":200}
We provide built-in envelopes for popular AWS Lambda event sources to easily decode and/or deserialize JSON objects.
1 2 3 4 5 6 7 8 9101112131415161718
from__future__importannotationsfromaws_lambda_powertoolsimportLoggerfromaws_lambda_powertools.utilities.jmespath_utilsimport(envelopes,extract_data_from_envelope,)fromaws_lambda_powertools.utilities.typingimportLambdaContextlogger=Logger()defhandler(event:dict,context:LambdaContext)->dict:records:list=extract_data_from_envelope(data=event,envelope=envelopes.SQS)forrecordinrecords:# records is a listlogger.info(record.get("customerId"))# now deserializedreturn{"message":"success","statusCode":200}
If you don't require SNS metadata, enable raw message delivery{target="blank"}. It will reduce multiple payload layers and size, when using SNS in combination with other services (_e.g., SQS, S3, etc).
You can use our built-in JMESPath functions within your envelope expression. They handle deserialization for common data formats found in AWS Lambda event sources such as JSON strings, base64, and uncompress gzip data.
Info
We use these everywhere in Powertools to easily decode and unwrap events from Amazon API Gateway, Amazon Kinesis, AWS CloudWatch Logs, etc.
importjsonfromdataclassesimportasdict,dataclass,field,is_dataclassfromuuidimportuuid4importpowertools_json_jmespath_schemaasschemasfromjmespath.exceptionsimportJMESPathTypeErrorfromaws_lambda_powertools.utilities.typingimportLambdaContextfromaws_lambda_powertools.utilities.validationimportSchemaValidationError,validate@dataclassclassOrder:user_id:intproduct_id:intquantity:intprice:floatcurrency:strorder_id:str=field(default_factory=lambda:f"{uuid4()}")classDataclassCustomEncoder(json.JSONEncoder):"""A custom JSON encoder to serialize dataclass obj"""defdefault(self,obj):# Only called for values that aren't JSON serializable# where `obj` will be an instance of Order in this examplereturnasdict(obj)ifis_dataclass(obj)elsesuper().default(obj)deflambda_handler(event,context:LambdaContext)->dict:try:# Validate order against our schemavalidate(event=event,schema=schemas.INPUT,envelope="powertools_json(payload)")# Deserialize JSON string order as dict# alternatively, extract_data_from_envelope works here tooorder_payload:dict=json.loads(event.get("payload"))return{"order":json.dumps(Order(**order_payload),cls=DataclassCustomEncoder),"message":"order created","success":True,}exceptJMESPathTypeError:# The powertools_json() envelope function must match a valid pathreturnreturn_error_message("Invalid request.")exceptSchemaValidationErrorasexception:# SchemaValidationError indicates where a data mismatch isreturnreturn_error_message(str(exception))exceptjson.JSONDecodeError:returnreturn_error_message("Payload must be valid JSON (base64 encoded).")defreturn_error_message(message:str)->dict:return{"order":None,"message":message,"success":False}
INPUT={"$schema":"http://json-schema.org/draft-07/schema","$id":"http://example.com/example.json","type":"object","title":"Sample order schema","description":"The root schema comprises the entire JSON document.","examples":[{"user_id":123,"product_id":1,"quantity":2,"price":10.40,"currency":"USD"}],"required":["user_id","product_id","quantity","price","currency"],"properties":{"user_id":{"$id":"#/properties/user_id","type":"integer","title":"The unique identifier of the user","examples":[123],"maxLength":10,},"product_id":{"$id":"#/properties/product_id","type":"integer","title":"The unique identifier of the product","examples":[1],"maxLength":10,},"quantity":{"$id":"#/properties/quantity","type":"integer","title":"The quantity of the product","examples":[2],"maxLength":10,},"price":{"$id":"#/properties/price","type":"number","title":"The individual price of the product","examples":[10.40],"maxLength":10,},"currency":{"$id":"#/properties/currency","type":"string","title":"The currency","examples":["The currency of the order"],"maxLength":100,},},}
importjsonfromuuidimportuuid4importrequestsfromaws_lambda_powertools.utilities.idempotencyimport(DynamoDBPersistenceLayer,IdempotencyConfig,idempotent,)persistence_layer=DynamoDBPersistenceLayer(table_name="IdempotencyTable")# Treat everything under the "body" key# in the event json object as our payloadconfig=IdempotencyConfig(event_key_jmespath="powertools_json(body)")classPaymentError(Exception):...@idempotent(config=config,persistence_store=persistence_layer)defhandler(event,context)->dict:body=json.loads(event["body"])try:payment:dict=create_subscription_payment(user=body["user"],product_id=body["product_id"])return{"payment_id":payment.get("id"),"message":"success","statusCode":200}exceptrequests.HTTPErrorase:raisePaymentError("Unable to create payment subscription")fromedefcreate_subscription_payment(user:str,product_id:str)->dict:payload={"user":user,"product_id":product_id}ret:requests.Response=requests.post(url="https://httpbin.org/anything",data=payload)ret.raise_for_status()return{"id":f"{uuid4()}","message":"paid"}
importbase64importbinasciiimportjsonfromdataclassesimportasdict,dataclass,field,is_dataclassfromuuidimportuuid4importpowertools_base64_jmespath_schemaasschemasfromjmespath.exceptionsimportJMESPathTypeErrorfromaws_lambda_powertools.utilities.typingimportLambdaContextfromaws_lambda_powertools.utilities.validationimportSchemaValidationError,validate@dataclassclassOrder:user_id:intproduct_id:intquantity:intprice:floatcurrency:strorder_id:str=field(default_factory=lambda:f"{uuid4()}")classDataclassCustomEncoder(json.JSONEncoder):"""A custom JSON encoder to serialize dataclass obj"""defdefault(self,obj):# Only called for values that aren't JSON serializable# where `obj` will be an instance of Todo in this examplereturnasdict(obj)ifis_dataclass(obj)elsesuper().default(obj)deflambda_handler(event,context:LambdaContext)->dict:# Try to validate the schematry:validate(event=event,schema=schemas.INPUT,envelope="powertools_json(powertools_base64(payload))")# alternatively, extract_data_from_envelope works here toopayload_decoded=base64.b64decode(event["payload"]).decode()order_payload:dict=json.loads(payload_decoded)return{"order":json.dumps(Order(**order_payload),cls=DataclassCustomEncoder),"message":"order created","success":True,}exceptJMESPathTypeError:returnreturn_error_message("The powertools_json(powertools_base64()) envelope function must match a valid path.")exceptbinascii.Error:returnreturn_error_message("Payload must be a valid base64 encoded string")exceptjson.JSONDecodeError:returnreturn_error_message("Payload must be valid JSON (base64 encoded).")exceptSchemaValidationErrorasexception:# SchemaValidationError indicates where a data mismatch isreturnreturn_error_message(str(exception))defreturn_error_message(message:str)->dict:return{"order":None,"message":message,"success":False}
INPUT={"$schema":"http://json-schema.org/draft-07/schema","$id":"http://example.com/example.json","type":"object","title":"Sample order schema","description":"The root schema comprises the entire JSON document.","examples":[{"user_id":123,"product_id":1,"quantity":2,"price":10.40,"currency":"USD"}],"required":["user_id","product_id","quantity","price","currency"],"properties":{"user_id":{"$id":"#/properties/user_id","type":"integer","title":"The unique identifier of the user","examples":[123],"maxLength":10,},"product_id":{"$id":"#/properties/product_id","type":"integer","title":"The unique identifier of the product","examples":[1],"maxLength":10,},"quantity":{"$id":"#/properties/quantity","type":"integer","title":"The quantity of the product","examples":[2],"maxLength":10,},"price":{"$id":"#/properties/price","type":"number","title":"The individual price of the product","examples":[10.40],"maxLength":10,},"currency":{"$id":"#/properties/currency","type":"string","title":"The currency","examples":["The currency of the order"],"maxLength":100,},},}
Use powertools_base64_gzip function to decompress and decode base64 data.
This sample will decompress and decode base64 data from Cloudwatch Logs, then use JMESPath pipeline expression to pass the result for decoding its JSON string.
importbase64importbinasciiimportgzipimportjsonimportpowertools_base64_gzip_jmespath_schemaasschemasfromjmespath.exceptionsimportJMESPathTypeErrorfromaws_lambda_powertools.utilities.typingimportLambdaContextfromaws_lambda_powertools.utilities.validationimportSchemaValidationError,validatedeflambda_handler(event,context:LambdaContext)->dict:try:validate(event=event,schema=schemas.INPUT,envelope="powertools_base64_gzip(payload) | powertools_json(@)")# Alternatively, extract_data_from_envelope works here tooencoded_payload=base64.b64decode(event["payload"])uncompressed_payload=gzip.decompress(encoded_payload).decode()log:dict=json.loads(uncompressed_payload)return{"message":"Logs processed","log_group":log.get("logGroup"),"owner":log.get("owner"),"success":True,}exceptJMESPathTypeError:returnreturn_error_message("The powertools_base64_gzip() envelope function must match a valid path.")exceptbinascii.Error:returnreturn_error_message("Payload must be a valid base64 encoded string")exceptjson.JSONDecodeError:returnreturn_error_message("Payload must be valid JSON (base64 encoded).")exceptSchemaValidationErrorasexception:# SchemaValidationError indicates where a data mismatch isreturnreturn_error_message(str(exception))defreturn_error_message(message:str)->dict:return{"message":message,"success":False}
This should only be used for advanced use cases where you have special formats not covered by the built-in functions.
For special binary formats that you want to decode before applying JSON Schema validation, you can bring your own JMESPath function and any additional option via jmespath_options param. To keep Powertools built-in functions, you can subclass from PowertoolsFunctions.
Here is an example of how to decompress messages using snappy:
importbase64importbinasciiimportsnappyfromjmespath.exceptionsimportJMESPathTypeErrorfromjmespath.functionsimportsignaturefromaws_lambda_powertools.utilities.jmespath_utilsimport(PowertoolsFunctions,extract_data_from_envelope,)classCustomFunctions(PowertoolsFunctions):# only decode if value is a string# see supported data types: https://jmespath.org/specification.html#built-in-functions@signature({"types":["string"]})def_func_decode_snappy_compression(self,payload:str):decoded:bytes=base64.b64decode(payload)returnsnappy.uncompress(decoded)custom_jmespath_options={"custom_functions":CustomFunctions()}deflambda_handler(event,context)->dict:try:logs=[]logs.append(extract_data_from_envelope(data=event,# NOTE: Use the prefix `_func_` before the name of the functionenvelope="Records[*].decode_snappy_compression(log)",jmespath_options=custom_jmespath_options,))return{"logs":logs,"message":"Extracted messages","success":True}exceptJMESPathTypeError:returnreturn_error_message("The envelope function must match a valid path.")exceptsnappy.UncompressError:returnreturn_error_message("Log must be a valid snappy compressed binary")exceptbinascii.Error:returnreturn_error_message("Log must be a valid base64 encoded string")defreturn_error_message(message:str)->dict:return{"logs":None,"message":message,"success":False}