Skip to content

JMESPath Functions

Tip

JMESPath is a query language for JSON used by AWS CLI, AWS Python SDK, and AWS Lambda Powertools for Python.

Built-in JMESPath Functions to easily deserialize common encoded JSON payloads in Lambda functions.

Key features

  • Deserialize JSON from JSON strings, base64, and compressed data
  • Use JMESPath to extract and combine data recursively
  • Provides commonly used JMESPath expression with popular event sources

Getting started

Tip

All examples shared in this documentation are available within the project repository.

You might have events that contains encoded JSON payloads as string, base64, or even in compressed format. It is a common use case to decode and extract them partially or fully as part of your Lambda function invocation.

Powertools also have utilities like validation, idempotency, or feature flags where you might need to extract a portion of your data before using them.

Terminology

Envelope is the terminology we use for the JMESPath expression to extract your JSON object from your data input. We might use those two terms interchangeably.

Extracting data

You can use the extract_data_from_envelope function with any JMESPath expression.

Tip

Another common use case is to fetch deeply nested data, filter, flatten, and more.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from aws_lambda_powertools.utilities.jmespath_utils import extract_data_from_envelope
from aws_lambda_powertools.utilities.typing import LambdaContext


def handler(event: dict, context: LambdaContext) -> dict:
    payload = extract_data_from_envelope(data=event, envelope="powertools_json(body)")
    customer_id = payload.get("customerId")  # now deserialized

    # also works for fetching and flattening deeply nested data
    some_data = extract_data_from_envelope(data=event, envelope="deeply_nested[*].some_data[]")

    return {"customer_id": customer_id, "message": "success", "context": some_data, "statusCode": 200}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
{
    "body": "{\"customerId\":\"dd4649e6-2484-4993-acb8-0f9123103394\"}",
    "deeply_nested": [
        {
            "some_data": [
                1,
                2,
                3
            ]
        }
    ]
}

Built-in envelopes

We provide built-in envelopes for popular AWS Lambda event sources to easily decode and/or deserialize JSON objects.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from aws_lambda_powertools.utilities.jmespath_utils import (
    envelopes,
    extract_data_from_envelope,
)
from aws_lambda_powertools.utilities.typing import LambdaContext


def handler(event: dict, context: LambdaContext) -> dict:
    payload = extract_data_from_envelope(data=event, envelope=envelopes.SQS)
    customer_id = payload.get("customerId")  # now deserialized

    return {"customer_id": customer_id, "message": "success", "statusCode": 200}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
{
    "Records": [
        {
            "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
            "receiptHandle": "MessageReceiptHandle",
            "body": "{\"customerId\":\"dd4649e6-2484-4993-acb8-0f9123103394\",\"booking\":{\"id\":\"5b2c4803-330b-42b7-811a-c68689425de1\",\"reference\":\"ySz7oA\",\"outboundFlightId\":\"20c0d2f2-56a3-4068-bf20-ff7703db552d\"},\"payment\":{\"receipt\":\"https:\/\/pay.stripe.com\/receipts\/acct_1Dvn7pF4aIiftV70\/ch_3JTC14F4aIiftV700iFq2CHB\/rcpt_K7QsrFln9FgFnzUuBIiNdkkRYGxUL0X\",\"amount\":100}}",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1523232000000",
                "SenderId": "123456789012",
                "ApproximateFirstReceiveTimestamp": "1523232000001"
            },
            "messageAttributes": {},
            "md5OfBody": "7b270e59b47ff90a553787216d55d91d",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue",
            "awsRegion": "us-east-1"
        }
    ]
}

These are all built-in envelopes you can use along with their expression as a reference:

Envelope JMESPath expression
API_GATEWAY_REST powertools_json(body)
API_GATEWAY_HTTP API_GATEWAY_REST
SQS Records[*].powertools_json(body)
SNS Records[0].Sns.Message | powertools_json(@)
EVENTBRIDGE detail
CLOUDWATCH_EVENTS_SCHEDULED EVENTBRIDGE
KINESIS_DATA_STREAM Records[*].kinesis.powertools_json(powertools_base64(data))
CLOUDWATCH_LOGS awslogs.powertools_base64_gzip(data) | powertools_json(@).logEvents[*]

Advanced

Built-in JMESPath functions

You can use our built-in JMESPath functions within your envelope expression. They handle deserialization for common data formats found in AWS Lambda event sources such as JSON strings, base64, and uncompress gzip data.

Info

We use these everywhere in Powertools to easily decode and unwrap events from Amazon API Gateway, Amazon Kinesis, AWS CloudWatch Logs, etc.

powertools_json function

Use powertools_json function to decode any JSON string anywhere a JMESPath expression is allowed.

Validation scenario

This sample will deserialize the JSON string within the data key before validation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import json
from dataclasses import asdict, dataclass, field, is_dataclass
from uuid import uuid4

import powertools_json_jmespath_schema as schemas
from jmespath.exceptions import JMESPathTypeError

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.validation import SchemaValidationError, validate


@dataclass
class Order:
    user_id: int
    product_id: int
    quantity: int
    price: float
    currency: str
    order_id: str = field(default_factory=lambda: f"{uuid4()}")


class DataclassCustomEncoder(json.JSONEncoder):
    """A custom JSON encoder to serialize dataclass obj"""

    def default(self, obj):
        # Only called for values that aren't JSON serializable
        # where `obj` will be an instance of Order in this example
        return asdict(obj) if is_dataclass(obj) else super().default(obj)


def lambda_handler(event, context: LambdaContext) -> dict:
    try:
        # Validate order against our schema
        validate(event=event, schema=schemas.INPUT, envelope="powertools_json(payload)")

        # Deserialize JSON string order as dict
        # alternatively, extract_data_from_envelope works here too
        order_payload: dict = json.loads(event.get("payload"))

        return {
            "order": json.dumps(Order(**order_payload), cls=DataclassCustomEncoder),
            "message": "order created",
            "success": True,
        }
    except JMESPathTypeError:
        # The powertools_json() envelope function must match a valid path
        return return_error_message("Invalid request.")
    except SchemaValidationError as exception:
        # SchemaValidationError indicates where a data mismatch is
        return return_error_message(str(exception))
    except json.JSONDecodeError:
        return return_error_message("Payload must be valid JSON (base64 encoded).")


def return_error_message(message: str) -> dict:
    return {"order": None, "message": message, "success": False}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
INPUT = {
    "$schema": "http://json-schema.org/draft-07/schema",
    "$id": "http://example.com/example.json",
    "type": "object",
    "title": "Sample order schema",
    "description": "The root schema comprises the entire JSON document.",
    "examples": [{"user_id": 123, "product_id": 1, "quantity": 2, "price": 10.40, "currency": "USD"}],
    "required": ["user_id", "product_id", "quantity", "price", "currency"],
    "properties": {
        "user_id": {
            "$id": "#/properties/user_id",
            "type": "integer",
            "title": "The unique identifier of the user",
            "examples": [123],
            "maxLength": 10,
        },
        "product_id": {
            "$id": "#/properties/product_id",
            "type": "integer",
            "title": "The unique identifier of the product",
            "examples": [1],
            "maxLength": 10,
        },
        "quantity": {
            "$id": "#/properties/quantity",
            "type": "integer",
            "title": "The quantity of the product",
            "examples": [2],
            "maxLength": 10,
        },
        "price": {
            "$id": "#/properties/price",
            "type": "number",
            "title": "The individual price of the product",
            "examples": [10.40],
            "maxLength": 10,
        },
        "currency": {
            "$id": "#/properties/currency",
            "type": "string",
            "title": "The currency",
            "examples": ["The currency of the order"],
            "maxLength": 100,
        },
    },
}
1
2
3
{
    "payload":"{\"user_id\": 123, \"product_id\": 1, \"quantity\": 2, \"price\": 10.40, \"currency\": \"USD\"}"
}

Idempotency scenario

This sample will deserialize the JSON string within the body key before Idempotency processes it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import json
from uuid import uuid4

import requests

from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent,
)

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")

# Treat everything under the "body" key
# in the event json object as our payload
config = IdempotencyConfig(event_key_jmespath="powertools_json(body)")


class PaymentError(Exception):
    ...


@idempotent(config=config, persistence_store=persistence_layer)
def handler(event, context) -> dict:
    body = json.loads(event["body"])
    try:
        payment: dict = create_subscription_payment(user=body["user"], product_id=body["product_id"])
        return {"payment_id": payment.get("id"), "message": "success", "statusCode": 200}
    except requests.HTTPError as e:
        raise PaymentError("Unable to create payment subscription") from e


def create_subscription_payment(user: str, product_id: str) -> dict:
    payload = {"user": user, "product_id": product_id}
    ret: requests.Response = requests.post(url="https://httpbin.org/anything", data=payload)
    ret.raise_for_status()

    return {"id": f"{uuid4()}", "message": "paid"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
    "version":"2.0",
    "routeKey":"ANY /createpayment",
    "rawPath":"/createpayment",
    "rawQueryString":"",
    "headers": {
      "Header1": "value1",
      "Header2": "value2"
    },
    "requestContext":{
      "accountId":"123456789012",
      "apiId":"api-id",
      "domainName":"id.execute-api.us-east-1.amazonaws.com",
      "domainPrefix":"id",
      "http":{
        "method":"POST",
        "path":"/createpayment",
        "protocol":"HTTP/1.1",
        "sourceIp":"ip",
        "userAgent":"agent"
      },
      "requestId":"id",
      "routeKey":"ANY /createpayment",
      "stage":"$default",
      "time":"10/Feb/2021:13:40:43 +0000",
      "timeEpoch":1612964443723
    },
    "body":"{\"user\":\"xyz\",\"product_id\":\"123456789\"}",
    "isBase64Encoded":false
  }

powertools_base64 function

Use powertools_base64 function to decode any base64 data.

This sample will decode the base64 value within the data key, and deserialize the JSON string before validation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import base64
import binascii
import json
from dataclasses import asdict, dataclass, field, is_dataclass
from uuid import uuid4

import powertools_base64_jmespath_schema as schemas
from jmespath.exceptions import JMESPathTypeError

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.validation import SchemaValidationError, validate


@dataclass
class Order:
    user_id: int
    product_id: int
    quantity: int
    price: float
    currency: str
    order_id: str = field(default_factory=lambda: f"{uuid4()}")


class DataclassCustomEncoder(json.JSONEncoder):
    """A custom JSON encoder to serialize dataclass obj"""

    def default(self, obj):
        # Only called for values that aren't JSON serializable
        # where `obj` will be an instance of Todo in this example
        return asdict(obj) if is_dataclass(obj) else super().default(obj)


def lambda_handler(event, context: LambdaContext) -> dict:

    # Try to validate the schema
    try:
        validate(event=event, schema=schemas.INPUT, envelope="powertools_json(powertools_base64(payload))")

        # alternatively, extract_data_from_envelope works here too
        payload_decoded = base64.b64decode(event["payload"]).decode()

        order_payload: dict = json.loads(payload_decoded)

        return {
            "order": json.dumps(Order(**order_payload), cls=DataclassCustomEncoder),
            "message": "order created",
            "success": True,
        }
    except JMESPathTypeError:
        return return_error_message(
            "The powertools_json(powertools_base64()) envelope function must match a valid path."
        )
    except binascii.Error:
        return return_error_message("Payload must be a valid base64 encoded string")
    except json.JSONDecodeError:
        return return_error_message("Payload must be valid JSON (base64 encoded).")
    except SchemaValidationError as exception:
        # SchemaValidationError indicates where a data mismatch is
        return return_error_message(str(exception))


def return_error_message(message: str) -> dict:
    return {"order": None, "message": message, "success": False}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
INPUT = {
    "$schema": "http://json-schema.org/draft-07/schema",
    "$id": "http://example.com/example.json",
    "type": "object",
    "title": "Sample order schema",
    "description": "The root schema comprises the entire JSON document.",
    "examples": [{"user_id": 123, "product_id": 1, "quantity": 2, "price": 10.40, "currency": "USD"}],
    "required": ["user_id", "product_id", "quantity", "price", "currency"],
    "properties": {
        "user_id": {
            "$id": "#/properties/user_id",
            "type": "integer",
            "title": "The unique identifier of the user",
            "examples": [123],
            "maxLength": 10,
        },
        "product_id": {
            "$id": "#/properties/product_id",
            "type": "integer",
            "title": "The unique identifier of the product",
            "examples": [1],
            "maxLength": 10,
        },
        "quantity": {
            "$id": "#/properties/quantity",
            "type": "integer",
            "title": "The quantity of the product",
            "examples": [2],
            "maxLength": 10,
        },
        "price": {
            "$id": "#/properties/price",
            "type": "number",
            "title": "The individual price of the product",
            "examples": [10.40],
            "maxLength": 10,
        },
        "currency": {
            "$id": "#/properties/currency",
            "type": "string",
            "title": "The currency",
            "examples": ["The currency of the order"],
            "maxLength": 100,
        },
    },
}
1
2
3
{
    "payload":"eyJ1c2VyX2lkIjogMTIzLCAicHJvZHVjdF9pZCI6IDEsICJxdWFudGl0eSI6IDIsICJwcmljZSI6IDEwLjQwLCAiY3VycmVuY3kiOiAiVVNEIn0="
}

powertools_base64_gzip function

Use powertools_base64_gzip function to decompress and decode base64 data.

This sample will decompress and decode base64 data from Cloudwatch Logs, then use JMESPath pipeline expression to pass the result for decoding its JSON string.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import base64
import binascii
import gzip
import json

import powertools_base64_gzip_jmespath_schema as schemas
from jmespath.exceptions import JMESPathTypeError

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.validation import SchemaValidationError, validate


def lambda_handler(event, context: LambdaContext) -> dict:
    try:
        validate(event=event, schema=schemas.INPUT, envelope="powertools_base64_gzip(payload) | powertools_json(@)")

        # Alternatively, extract_data_from_envelope works here too
        encoded_payload = base64.b64decode(event["payload"])
        uncompressed_payload = gzip.decompress(encoded_payload).decode()
        log: dict = json.loads(uncompressed_payload)

        return {
            "message": "Logs processed",
            "log_group": log.get("logGroup"),
            "owner": log.get("owner"),
            "success": True,
        }

    except JMESPathTypeError:
        return return_error_message("The powertools_base64_gzip() envelope function must match a valid path.")
    except binascii.Error:
        return return_error_message("Payload must be a valid base64 encoded string")
    except json.JSONDecodeError:
        return return_error_message("Payload must be valid JSON (base64 encoded).")
    except SchemaValidationError as exception:
        # SchemaValidationError indicates where a data mismatch is
        return return_error_message(str(exception))


def return_error_message(message: str) -> dict:
    return {"message": message, "success": False}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
INPUT = {
    "$schema": "http://json-schema.org/draft-07/schema",
    "$id": "http://example.com/example.json",
    "type": "object",
    "title": "Sample schema",
    "description": "The root schema comprises the entire JSON document.",
    "examples": [
        {
            "owner": "123456789012",
            "logGroup": "/aws/lambda/powertools-example",
            "logStream": "2022/08/07/[$LATEST]d3a8dcaffc7f4de2b8db132e3e106660",
            "logEvents": {},
        }
    ],
    "required": ["owner", "logGroup", "logStream", "logEvents"],
    "properties": {
        "owner": {
            "$id": "#/properties/owner",
            "type": "string",
            "title": "The owner",
            "examples": ["123456789012"],
            "maxLength": 12,
        },
        "logGroup": {
            "$id": "#/properties/logGroup",
            "type": "string",
            "title": "The logGroup",
            "examples": ["/aws/lambda/powertools-example"],
            "maxLength": 100,
        },
        "logStream": {
            "$id": "#/properties/logStream",
            "type": "string",
            "title": "The logGroup",
            "examples": ["2022/08/07/[$LATEST]d3a8dcaffc7f4de2b8db132e3e106660"],
            "maxLength": 100,
        },
        "logEvents": {
            "$id": "#/properties/logEvents",
            "type": "array",
            "title": "The logEvents",
            "examples": [
                "{'id': 'eventId1', 'message': {'username': 'lessa', 'message': 'hello world'}, 'timestamp': 1440442987000}"  # noqa E501
            ],
        },
    },
}
1
2
3
{
    "payload": "H4sIACZAXl8C/52PzUrEMBhFX2UILpX8tPbHXWHqIOiq3Q1F0ubrWEiakqTWofTdTYYB0YWL2d5zvnuTFellBIOedoiyKH5M0iwnlKH7HZL6dDB6ngLDfLFYctUKjie9gHFaS/sAX1xNEq525QxwFXRGGMEkx4Th491rUZdV3YiIZ6Ljfd+lfSyAtZloacQgAkqSJCGhxM6t7cwwuUGPz4N0YKyvO6I9WDeMPMSo8Z4Ca/kJ6vMEYW5f1MX7W1lVxaG8vqX8hNFdjlc0iCBBSF4ERT/3Pl7RbMGMXF2KZMh/C+gDpNS7RRsp0OaRGzx0/t8e0jgmcczyLCWEePhni/23JWalzjdu0a3ZvgEaNLXeugEAAA=="
}

Bring your own JMESPath function

Warning

This should only be used for advanced use cases where you have special formats not covered by the built-in functions.

For special binary formats that you want to decode before applying JSON Schema validation, you can bring your own JMESPath function and any additional option via jmespath_options param. To keep Powertools built-in functions, you can subclass from PowertoolsFunctions.

Here is an example of how to decompress messages using snappy:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import base64
import binascii

import snappy
from jmespath.exceptions import JMESPathTypeError
from jmespath.functions import signature

from aws_lambda_powertools.utilities.jmespath_utils import (
    PowertoolsFunctions,
    extract_data_from_envelope,
)


class CustomFunctions(PowertoolsFunctions):
    # only decode if value is a string
    # see supported data types: https://jmespath.org/specification.html#built-in-functions
    @signature({"types": ["string"]})
    def _func_decode_snappy_compression(self, payload: str):
        decoded: bytes = base64.b64decode(payload)
        return snappy.uncompress(decoded)


custom_jmespath_options = {"custom_functions": CustomFunctions()}


def lambda_handler(event, context) -> dict:

    try:
        logs = []
        logs.append(
            extract_data_from_envelope(
                data=event,
                # NOTE: Use the prefix `_func_` before the name of the function
                envelope="Records[*].decode_snappy_compression(log)",
                jmespath_options=custom_jmespath_options,
            )
        )
        return {"logs": logs, "message": "Extracted messages", "success": True}
    except JMESPathTypeError:
        return return_error_message("The envelope function must match a valid path.")
    except snappy.UncompressError:
        return return_error_message("Log must be a valid snappy compressed binary")
    except binascii.Error:
        return return_error_message("Log must be a valid base64 encoded string")


def return_error_message(message: str) -> dict:
    return {"logs": None, "message": message, "success": False}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
    "Records": [
        {
            "user": "integration-kafka",
            "datetime": "2022-01-01T00:00:00.000Z",
            "log": "/QGIMjAyMi8wNi8xNiAxNjoyNTowMCBbY3JpdF0gMzA1MTg5MCMNCPBEOiAqMSBjb25uZWN0KCkg\ndG8gMTI3LjAuMC4xOjUwMDAgZmFpbGVkICgxMzogUGVybWlzc2lvbiBkZW5pZWQpIHdoaWxlEUEI\naW5nAUJAdXBzdHJlYW0sIGNsaWVudDoZVKgsIHNlcnZlcjogXywgcmVxdWVzdDogIk9QVElPTlMg\nLyBIVFRQLzEuMSIsFUckOiAiaHR0cDovLzabABQvIiwgaG8FQDAxMjcuMC4wLjE6ODEi\n"
        },
        {
            "user": "integration-kafka",
            "datetime": "2022-01-01T00:00:01.000Z",
            "log": "tQHwnDEyNy4wLjAuMSAtIC0gWzE2L0p1bi8yMDIyOjE2OjMwOjE5ICswMTAwXSAiT1BUSU9OUyAv\nIEhUVFAvMS4xIiAyMDQgMCAiLSIgIk1vemlsbGEvNS4wIChYMTE7IExpbnV4IHg4Nl82NCkgQXBw\nbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzEwMi4BmUwwIFNhZmFy\naS81MzcuMzYiICItIg==\n"
        }
    ]
}

Last update: 2022-10-05