Skip to content

Tracer

Tracer is an opinionated thin wrapper for AWS X-Ray Python SDK.

Tracer showcase

Key features

  • Auto capture cold start as annotation, and responses or full exceptions as metadata
  • Auto-disable when not running in AWS Lambda environment
  • Support tracing async methods, generators, and context managers
  • Auto patch supported modules by AWS X-Ray

Getting started

Tip

All examples shared in this documentation are available within the project repository.

Install

This is not necessary if you're installing Powertools via Lambda Layer/SAR

Add aws-lambda-powertools[tracer] as a dependency in your preferred tool: e.g., requirements.txt, pyproject.toml. This will ensure you have the required dependencies before using Tracer.

Permissions

Before your use this utility, your AWS Lambda function must have permissions to send traces to AWS X-Ray.

AWS Serverless Application Model (SAM) example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: AWS Lambda Powertools Tracer doc examples

Globals:
  Function:
    Timeout: 5
    Runtime: python3.9
    Tracing: Active
    Environment:
      Variables:
        POWERTOOLS_SERVICE_NAME: payment
    Layers:
      # Find the latest Layer version in the official documentation
      # https://awslabs.github.io/aws-lambda-powertools-python/latest/#lambda-layer
      - !Sub arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPython:21

Resources:
  CaptureLambdaHandlerExample:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ../src
      Handler: capture_lambda_handler.handler

Lambda handler

You can quickly start by initializing Tracer and use capture_lambda_handler decorator for your Lambda handler.

Tracing Lambda handler with capture_lambda_handler
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()  # Sets service via POWERTOOLS_SERVICE_NAME env var
# OR tracer = Tracer(service="example")


def collect_payment(charge_id: str) -> str:
    return f"dummy payment collected for charge: {charge_id}"


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    return collect_payment(charge_id=charge_id)

capture_lambda_handler performs these additional tasks to ease operations:

  • Creates a ColdStart annotation to easily filter traces that have had an initialization overhead
  • Creates a Service annotation if service parameter or POWERTOOLS_SERVICE_NAME is set
  • Captures any response, or full exceptions generated by the handler, and include as tracing metadata

Annotations & Metadata

Annotations are key-values associated with traces and indexed by AWS X-Ray. You can use them to filter traces and to create Trace Groups to slice and dice your transactions.

Adding annotations with put_annotation method
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()


def collect_payment(charge_id: str) -> str:
    tracer.put_annotation(key="PaymentId", value=charge_id)
    return f"dummy payment collected for charge: {charge_id}"


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    return collect_payment(charge_id=charge_id)

Metadata are key-values also associated with traces but not indexed by AWS X-Ray. You can use them to add additional context for an operation using any native object.

Adding arbitrary metadata with put_metadata method
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()


def collect_payment(charge_id: str) -> str:
    return f"dummy payment collected for charge: {charge_id}"


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> str:
    payment_context = {
        "charge_id": event.get("charge_id", ""),
        "merchant_id": event.get("merchant_id", ""),
        "request_id": context.aws_request_id,
    }
    payment_context["receipt_id"] = collect_payment(charge_id=payment_context["charge_id"])
    tracer.put_metadata(key="payment_response", value=payment_context)

    return payment_context["receipt_id"]

Synchronous functions

You can trace synchronous functions using the capture_method decorator.

Tracing an arbitrary function with capture_method
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()


@tracer.capture_method
def collect_payment(charge_id: str) -> str:
    tracer.put_annotation(key="PaymentId", value=charge_id)
    return f"dummy payment collected for charge: {charge_id}"


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    return collect_payment(charge_id=charge_id)
Note: Function responses are auto-captured and stored as JSON, by default.

Use capture_response parameter to override this behaviour.

The serialization is performed by aws-xray-sdk via jsonpickle module. This can cause side effects for file-like objects like boto S3 StreamingBody, where its response will be read only once during serialization.

Asynchronous and generator functions

Warning

We do not support asynchronous Lambda handler

You can trace asynchronous functions and generator functions (including context managers) using capture_method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import asyncio

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()


@tracer.capture_method
async def collect_payment(charge_id: str) -> str:
    tracer.put_annotation(key="PaymentId", value=charge_id)
    await asyncio.sleep(0.5)
    return f"dummy payment collected for charge: {charge_id}"


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    return asyncio.run(collect_payment(charge_id=charge_id))
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import contextlib
from collections.abc import Generator

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()
logger = Logger()


@contextlib.contextmanager
@tracer.capture_method
def collect_payment(charge_id: str) -> Generator[str, None, None]:
    try:
        yield f"dummy payment collected for charge: {charge_id}"
    finally:
        tracer.put_annotation(key="PaymentId", value=charge_id)


@tracer.capture_lambda_handler
@logger.inject_lambda_context
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    with collect_payment(charge_id=charge_id) as receipt_id:
        logger.info(f"Processing payment collection for charge {charge_id} with receipt {receipt_id}")

    return receipt_id
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from collections.abc import Generator

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()


@tracer.capture_method
def collect_payment(charge_id: str) -> Generator[str, None, None]:
    yield f"dummy payment collected for charge: {charge_id}"


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    return next(collect_payment(charge_id=charge_id))

Advanced

Patching modules

Tracer automatically patches all supported libraries by X-Ray during initialization, by default. Underneath, AWS X-Ray SDK checks whether a supported library has been imported before patching.

If you're looking to shave a few microseconds, or milliseconds depending on your function memory configuration, you can patch specific modules using patch_modules param:

Example of explicitly patching requests only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import requests

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

MODULES = ["requests"]

tracer = Tracer(patch_modules=MODULES)


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> str:
    ret = requests.get("https://httpbin.org/get")
    ret.raise_for_status()

    return ret.json()

Disabling response auto-capture

Use capture_response=False parameter in both capture_lambda_handler and capture_method decorators to instruct Tracer not to serialize function responses as metadata.

Info: This is useful in three common scenarios
  1. You might return sensitive information you don't want it to be added to your traces
  2. You might manipulate streaming objects that can be read only once; this prevents subsequent calls from being empty
  3. You might return more than 64K of data e.g., message too long error
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()
logger = Logger()


@tracer.capture_method(capture_response=False)
def collect_payment(charge_id: str) -> str:
    tracer.put_annotation(key="PaymentId", value=charge_id)
    logger.debug("Returning sensitive information....")
    return f"dummy payment collected for charge: {charge_id}"


@tracer.capture_lambda_handler(capture_response=False)
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    return collect_payment(charge_id=charge_id)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os

import boto3
from botocore.response import StreamingBody

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

BUCKET = os.getenv("BUCKET_NAME", "")
REPORT_KEY = os.getenv("REPORT_KEY", "")

tracer = Tracer()
logger = Logger()

session = boto3.Session()
s3 = session.client("s3")


@tracer.capture_method(capture_response=False)
def fetch_payment_report(payment_id: str) -> StreamingBody:
    ret = s3.get_object(Bucket=BUCKET, Key=f"{REPORT_KEY}/{payment_id}")
    logger.debug("Returning streaming body from S3 object....")
    return ret["body"]


@tracer.capture_lambda_handler(capture_response=False)
def handler(event: dict, context: LambdaContext) -> str:
    payment_id = event.get("payment_id", "")
    report = fetch_payment_report(payment_id=payment_id)
    return report.read().decode()

Disabling exception auto-capture

Use capture_error=False parameter in both capture_lambda_handler and capture_method decorators to instruct Tracer not to serialize exceptions as metadata.

Info

Useful when returning sensitive information in exceptions/stack traces you don't control

Disabling exception auto-capture for tracing metadata
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import os

import requests

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()
ENDPOINT = os.getenv("PAYMENT_API", "")


class PaymentError(Exception):
    ...


@tracer.capture_method(capture_error=False)
def collect_payment(charge_id: str) -> dict:
    try:
        ret = requests.post(url=f"{ENDPOINT}/collect", data={"charge_id": charge_id})
        ret.raise_for_status()
        return ret.json()
    except requests.HTTPError as e:
        raise PaymentError(f"Unable to collect payment for charge {charge_id}") from e


@tracer.capture_lambda_handler(capture_error=False)
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    ret = collect_payment(charge_id=charge_id)

    return ret.get("receipt_id", "")

Ignoring certain HTTP endpoints

You might have endpoints you don't want requests to be traced, perhaps due to the volume of calls or sensitive URLs.

You can use ignore_endpoint method with the hostname and/or URLs you'd like it to be ignored - globs (*) are allowed.

Ignoring certain HTTP endpoints from being traced
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import os

import requests

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

ENDPOINT = os.getenv("PAYMENT_API", "")
IGNORE_URLS = ["/collect", "/refund"]

tracer = Tracer()
tracer.ignore_endpoint(hostname=ENDPOINT, urls=IGNORE_URLS)
tracer.ignore_endpoint(hostname=f"*.{ENDPOINT}", urls=IGNORE_URLS)  # `<stage>.ENDPOINT`


class PaymentError(Exception):
    ...


@tracer.capture_method(capture_error=False)
def collect_payment(charge_id: str) -> dict:
    try:
        ret = requests.post(url=f"{ENDPOINT}/collect", data={"charge_id": charge_id})
        ret.raise_for_status()
        return ret.json()
    except requests.HTTPError as e:
        raise PaymentError(f"Unable to collect payment for charge {charge_id}") from e


@tracer.capture_lambda_handler(capture_error=False)
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    ret = collect_payment(charge_id=charge_id)

    return ret.get("receipt_id", "")

Tracing aiohttp requests

Info

This snippet assumes you have aiohttp as a dependency

You can use aiohttp_trace_config function to create a valid aiohttp trace_config object. This is necessary since X-Ray utilizes aiohttp trace hooks to capture requests end-to-end.

Tracing aiohttp requests
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import os

import aiohttp

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.tracing import aiohttp_trace_config
from aws_lambda_powertools.utilities.typing import LambdaContext

ENDPOINT = os.getenv("PAYMENT_API", "")

tracer = Tracer()


@tracer.capture_method
async def collect_payment(charge_id: str) -> dict:
    async with aiohttp.ClientSession(trace_configs=[aiohttp_trace_config()]) as session:
        async with session.get(f"{ENDPOINT}/collect") as resp:
            return await resp.json()


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> dict:
    charge_id = event.get("charge_id", "")
    return asyncio.run(collect_payment(charge_id=charge_id))

Escape hatch mechanism

You can use tracer.provider attribute to access all methods provided by AWS X-Ray xray_recorder object.

This is useful when you need a feature available in X-Ray that is not available in the Tracer utility, for example thread-safe, or context managers.

Tracing a code block with in_subsegment escape hatch
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()


def collect_payment(charge_id: str) -> str:
    return f"dummy payment collected for charge: {charge_id}"


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    with tracer.provider.in_subsegment("## collect_payment") as subsegment:
        subsegment.put_annotation(key="PaymentId", value=charge_id)
        ret = collect_payment(charge_id=charge_id)
        subsegment.put_metadata(key="payment_response", value=ret)

    return ret

Concurrent asynchronous functions

Warning

X-Ray SDK will raise an exception when async functions are run and traced concurrently

A safe workaround mechanism is to use in_subsegment_async available via Tracer escape hatch (tracer.provider).

Workaround to safely trace async concurrent functions
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()


async def another_async_task():
    async with tracer.provider.in_subsegment_async("## another_async_task") as subsegment:
        subsegment.put_annotation(key="key", value="value")
        subsegment.put_metadata(key="key", value="value", namespace="namespace")
        ...


async def another_async_task_2():
    async with tracer.provider.in_subsegment_async("## another_async_task_2") as subsegment:
        subsegment.put_annotation(key="key", value="value")
        subsegment.put_metadata(key="key", value="value", namespace="namespace")
        ...


async def collect_payment(charge_id: str) -> str:
    await asyncio.gather(another_async_task(), another_async_task_2())
    return f"dummy payment collected for charge: {charge_id}"


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    return asyncio.run(collect_payment(charge_id=charge_id))

Reusing Tracer across your code

Tracer keeps a copy of its configuration after the first initialization. This is useful for scenarios where you want to use Tracer in more than one location across your code base.

Warning: Import order matters when using Lambda Layers or multiple modules

Do not set auto_patch=False when reusing Tracer in Lambda Layers, or in multiple modules.

This can result in the first Tracer config being inherited by new instances, and their modules not being patched.

Tracer will automatically ignore imported modules that have been patched.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from tracer_reuse_module import collect_payment

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()


@tracer.capture_lambda_handler
def handler(event: dict, context: LambdaContext) -> str:
    charge_id = event.get("charge_id", "")
    return collect_payment(charge_id=charge_id)

A new instance of Tracer will be created but will reuse the previous Tracer instance configuration, similar to a Singleton.

1
2
3
4
5
6
7
8
from aws_lambda_powertools import Tracer

tracer = Tracer()


@tracer.capture_method
def collect_payment(charge_id: str) -> str:
    return f"dummy payment collected for charge: {charge_id}"

Testing your code

Tracer is disabled by default when not running in the AWS Lambda environment - This means no code changes or environment variables to be set.

Tips

  • Use annotations on key operations to slice and dice traces, create unique views, and create metrics from it via Trace Groups
  • Use a namespace when adding metadata to group data more easily
  • Annotations and metadata are added to the current subsegment opened. If you want them in a specific subsegment, use a context manager via the escape hatch mechanism

Last update: 2022-10-24