Skip to content

Tracing

CLASS DESCRIPTION
Tracer

Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions

Tracer

Tracer(
    service: str | None = None,
    disabled: bool | None = None,
    auto_patch: bool | None = None,
    patch_modules: Sequence[str] | None = None,
    provider: BaseProvider | None = None,
)

Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions

When running locally, it detects whether it's running via SAM CLI, and if it is it returns dummy segments/subsegments instead.

By default, it patches all available libraries supported by X-Ray SDK. Patching is automatically disabled when running locally via SAM CLI or by any other means.

Ref: https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/thirdparty.html

Tracer keeps a copy of its configuration as it can be instantiated more than once. This is useful when you are using your own middlewares and want to utilize an existing Tracer. Make sure to set auto_patch=False in subsequent Tracer instances to avoid double patching.

Environment variables

POWERTOOLS_TRACE_DISABLED : str disable tracer (e.g. "true", "True", "TRUE") POWERTOOLS_SERVICE_NAME : str service name POWERTOOLS_TRACER_CAPTURE_RESPONSE : str disable auto-capture response as metadata (e.g. "true", "True", "TRUE") POWERTOOLS_TRACER_CAPTURE_ERROR : str disable auto-capture error as metadata (e.g. "true", "True", "TRUE")

PARAMETER DESCRIPTION
service

Service name that will be appended in all tracing metadata

TYPE: str | None DEFAULT: None

auto_patch

Patch existing imported modules during initialization, by default True

TYPE: bool | None DEFAULT: None

disabled

Flag to explicitly disable tracing, useful when running/testing locally Env POWERTOOLS_TRACE_DISABLED="true"

TYPE: bool | None DEFAULT: None

patch_modules

Tuple of modules supported by tracing provider to patch, by default all modules are patched

TYPE: Sequence[str] | None DEFAULT: None

provider

Tracing provider, by default it is aws_xray_sdk.core.xray_recorder

TYPE: BaseProvider | None DEFAULT: None

RETURNS DESCRIPTION
Tracer

Tracer instance with imported modules patched

Example

A Lambda function using Tracer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from aws_lambda_powertools import Tracer
tracer = Tracer(service="greeting")

@tracer.capture_method
def greeting(name: str) -> dict:
    return {
        "name": name
    }

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> dict:
    print("Received event from Lambda...")
    response = greeting(name="Heitor")
    return response

Booking Lambda function using Tracer that adds additional annotation/metadata

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
def confirm_booking(booking_id: str) -> dict:
        resp = add_confirmation(booking_id)

        tracer.put_annotation("BookingConfirmation", resp["requestId"])
        tracer.put_metadata("Booking confirmation", resp)

        return resp

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> dict:
    print("Received event from Lambda...")
    booking_id = event.get("booking_id")
    response = confirm_booking(booking_id=booking_id)
    return response

A Lambda function using service name via POWERTOOLS_SERVICE_NAME

1
2
3
4
5
6
7
8
9
export POWERTOOLS_SERVICE_NAME="booking"
from aws_lambda_powertools import Tracer
tracer = Tracer()

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> dict:
    print("Received event from Lambda...")
    response = greeting(name="Lessa")
    return response

Reuse an existing instance of Tracer anywhere in the code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# lambda_handler.py
from aws_lambda_powertools import Tracer
tracer = Tracer()

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> dict:
    ...

# utils.py
from aws_lambda_powertools import Tracer
tracer = Tracer()
...
Limitations
  • Async handler not supported
METHOD DESCRIPTION
capture_lambda_handler

Decorator to create subsegment for lambda handlers

capture_method

Decorator to create subsegment for arbitrary functions

ignore_endpoint

If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being

patch

Patch modules for instrumentation.

put_annotation

Adds annotation to existing segment or subsegment

put_metadata

Adds metadata to existing segment or subsegment

Source code in aws_lambda_powertools/tracing/tracer.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def __init__(
    self,
    service: str | None = None,
    disabled: bool | None = None,
    auto_patch: bool | None = None,
    patch_modules: Sequence[str] | None = None,
    provider: BaseProvider | None = None,
):
    self.__build_config(
        service=service,
        disabled=disabled,
        auto_patch=auto_patch,
        patch_modules=patch_modules,
        provider=provider,
    )
    self.provider: BaseProvider = self._config["provider"]
    self.disabled = self._config["disabled"]
    self.service = self._config["service"]
    self.auto_patch = self._config["auto_patch"]

    if self.disabled:
        self._disable_tracer_provider()

    if self.auto_patch:
        self.patch(modules=patch_modules)

    if self._is_xray_provider():
        self._disable_xray_trace_batching()

capture_lambda_handler

capture_lambda_handler(
    lambda_handler: (
        Callable[[T, Any], Any]
        | Callable[[T, Any, Any], Any]
        | None
    ) = None,
    capture_response: bool | None = None,
    capture_error: bool | None = None,
)

Decorator to create subsegment for lambda handlers

As Lambda follows (event, context) signature we can remove some of the boilerplate and also capture any exception any Lambda function throws or its response as metadata

PARAMETER DESCRIPTION
lambda_handler

Method to annotate on

TYPE: Callable DEFAULT: None

capture_response

Instructs tracer to not include handler's response as metadata

TYPE: bool DEFAULT: None

capture_error

Instructs tracer to not include handler's error as metadata, by default True

TYPE: bool DEFAULT: None

Example

Lambda function using capture_lambda_handler decorator

1
2
3
4
tracer = Tracer(service="payment")
@tracer.capture_lambda_handler
def handler(event, context):
    ...

Preventing Tracer to log response as metadata

1
2
3
4
tracer = Tracer(service="payment")
@tracer.capture_lambda_handler(capture_response=False)
def handler(event, context):
    ...
RAISES DESCRIPTION
err

Exception raised by method

Source code in aws_lambda_powertools/tracing/tracer.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def capture_lambda_handler(
    self,
    lambda_handler: Callable[[T, Any], Any] | Callable[[T, Any, Any], Any] | None = None,
    capture_response: bool | None = None,
    capture_error: bool | None = None,
):
    """Decorator to create subsegment for lambda handlers

    As Lambda follows (event, context) signature we can remove some of the boilerplate
    and also capture any exception any Lambda function throws or its response as metadata

    Parameters
    ----------
    lambda_handler : Callable
        Method to annotate on
    capture_response : bool, optional
        Instructs tracer to not include handler's response as metadata
    capture_error : bool, optional
        Instructs tracer to not include handler's error as metadata, by default True

    Example
    -------
    **Lambda function using capture_lambda_handler decorator**

        tracer = Tracer(service="payment")
        @tracer.capture_lambda_handler
        def handler(event, context):
            ...

    **Preventing Tracer to log response as metadata**

        tracer = Tracer(service="payment")
        @tracer.capture_lambda_handler(capture_response=False)
        def handler(event, context):
            ...

    Raises
    ------
    err
        Exception raised by method
    """
    # If handler is None we've been called with parameters
    # Return a partial function with args filled
    if lambda_handler is None:
        logger.debug("Decorator called with parameters")
        return functools.partial(
            self.capture_lambda_handler,
            capture_response=capture_response,
            capture_error=capture_error,
        )

    lambda_handler_name = lambda_handler.__name__
    capture_response = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"),
        choice=capture_response,
    )
    capture_error = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"),
        choice=capture_error,
    )

    @functools.wraps(lambda_handler)
    def decorate(event, context, **kwargs):
        with self.provider.in_subsegment(name=f"## {lambda_handler_name}") as subsegment:
            try:
                logger.debug("Calling lambda handler")
                response = lambda_handler(event, context, **kwargs)
                logger.debug("Received lambda handler response successfully")
                self._add_response_as_metadata(
                    method_name=lambda_handler_name,
                    data=response,
                    subsegment=subsegment,
                    capture_response=capture_response,
                )
            except Exception as err:
                logger.exception(f"Exception received from {lambda_handler_name}")
                self._add_full_exception_as_metadata(
                    method_name=lambda_handler_name,
                    error=err,
                    subsegment=subsegment,
                    capture_error=capture_error,
                )

                raise
            finally:
                global is_cold_start
                logger.debug("Annotating cold start")
                subsegment.put_annotation(key="ColdStart", value=is_cold_start)

                if is_cold_start:
                    is_cold_start = False

                if self.service:
                    subsegment.put_annotation(key="Service", value=self.service)

            return response

    return decorate

capture_method

capture_method(method: AnyCallableT) -> AnyCallableT
capture_method(
    method: None = None,
    capture_response: bool | None = None,
    capture_error: bool | None = None,
) -> Callable[[AnyCallableT], AnyCallableT]
capture_method(
    method: AnyCallableT | None = None,
    capture_response: bool | None = None,
    capture_error: bool | None = None,
) -> AnyCallableT

Decorator to create subsegment for arbitrary functions

It also captures both response and exceptions as metadata and creates a subsegment named ## <method_module.method_qualifiedname>

see here: Qualified name for classes and functions

When running async functions concurrently, methods may impact each others subsegment, and can trigger and AlreadyEndedException from X-Ray due to async nature.

For this use case, either use capture_method only where async.gather is called, or use in_subsegment_async context manager via our escape hatch mechanism - See examples.

PARAMETER DESCRIPTION
method

Method to annotate on

TYPE: Callable DEFAULT: None

capture_response

Instructs tracer to not include method's response as metadata

TYPE: bool DEFAULT: None

capture_error

Instructs tracer to not include handler's error as metadata, by default True

TYPE: bool DEFAULT: None

Example

Custom function using capture_method decorator

1
2
3
tracer = Tracer(service="payment")
@tracer.capture_method
def some_function()

Custom async method using capture_method decorator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
async def confirm_booking(booking_id: str) -> dict:
    resp = call_to_booking_service()

    tracer.put_annotation("BookingConfirmation", resp["requestId"])
    tracer.put_metadata("Booking confirmation", resp)

    return resp

def lambda_handler(event: dict, context: Any) -> dict:
    booking_id = event.get("booking_id")
    asyncio.run(confirm_booking(booking_id=booking_id))

Custom generator function using capture_method decorator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
def bookings_generator(booking_id):
    resp = call_to_booking_service()
    yield resp[0]
    yield resp[1]

def lambda_handler(event: dict, context: Any) -> dict:
    gen = bookings_generator(booking_id=booking_id)
    result = list(gen)

Custom generator context manager using capture_method decorator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
@contextlib.contextmanager
def booking_actions(booking_id):
    resp = call_to_booking_service()
    yield "example result"
    cleanup_stuff()

def lambda_handler(event: dict, context: Any) -> dict:
    booking_id = event.get("booking_id")

    with booking_actions(booking_id=booking_id) as booking:
        result = booking

Tracing nested async calls

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
async def get_identity():
    ...

@tracer.capture_method
async def long_async_call():
    ...

@tracer.capture_method
async def async_tasks():
    await get_identity()
    ret = await long_async_call()

    return { "task": "done", **ret }

Safely tracing concurrent async calls with decorator

This may not needed once this bug is closed

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

async def get_identity():
    async with aioboto3.client("sts") as sts:
        account = await sts.get_caller_identity()
        return account

async def long_async_call():
    ...

@tracer.capture_method
async def async_tasks():
    _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

    return { "task": "done", **ret }

Safely tracing each concurrent async calls with escape hatch

This may not needed once this bug is closed

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

async def get_identity():
    async tracer.provider.in_subsegment_async("## get_identity"):
        ...

async def long_async_call():
    async tracer.provider.in_subsegment_async("## long_async_call"):
        ...

@tracer.capture_method
async def async_tasks():
    _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

    return { "task": "done", **ret }
RAISES DESCRIPTION
err

Exception raised by method

Source code in aws_lambda_powertools/tracing/tracer.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
def capture_method(
    self,
    method: AnyCallableT | None = None,
    capture_response: bool | None = None,
    capture_error: bool | None = None,
) -> AnyCallableT:
    """Decorator to create subsegment for arbitrary functions

    It also captures both response and exceptions as metadata
    and creates a subsegment named `## <method_module.method_qualifiedname>`
    # see here: [Qualified name for classes and functions](https://peps.python.org/pep-3155/)

    When running [async functions concurrently](https://docs.python.org/3/library/asyncio-task.html#id6),
    methods may impact each others subsegment, and can trigger
    and AlreadyEndedException from X-Ray due to async nature.

    For this use case, either use `capture_method` only where
    `async.gather` is called, or use `in_subsegment_async`
    context manager via our escape hatch mechanism - See examples.

    Parameters
    ----------
    method : Callable
        Method to annotate on
    capture_response : bool, optional
        Instructs tracer to not include method's response as metadata
    capture_error : bool, optional
        Instructs tracer to not include handler's error as metadata, by default True

    Example
    -------
    **Custom function using capture_method decorator**

        tracer = Tracer(service="payment")
        @tracer.capture_method
        def some_function()

    **Custom async method using capture_method decorator**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        async def confirm_booking(booking_id: str) -> dict:
            resp = call_to_booking_service()

            tracer.put_annotation("BookingConfirmation", resp["requestId"])
            tracer.put_metadata("Booking confirmation", resp)

            return resp

        def lambda_handler(event: dict, context: Any) -> dict:
            booking_id = event.get("booking_id")
            asyncio.run(confirm_booking(booking_id=booking_id))

    **Custom generator function using capture_method decorator**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        def bookings_generator(booking_id):
            resp = call_to_booking_service()
            yield resp[0]
            yield resp[1]

        def lambda_handler(event: dict, context: Any) -> dict:
            gen = bookings_generator(booking_id=booking_id)
            result = list(gen)

    **Custom generator context manager using capture_method decorator**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        @contextlib.contextmanager
        def booking_actions(booking_id):
            resp = call_to_booking_service()
            yield "example result"
            cleanup_stuff()

        def lambda_handler(event: dict, context: Any) -> dict:
            booking_id = event.get("booking_id")

            with booking_actions(booking_id=booking_id) as booking:
                result = booking

    **Tracing nested async calls**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        async def get_identity():
            ...

        @tracer.capture_method
        async def long_async_call():
            ...

        @tracer.capture_method
        async def async_tasks():
            await get_identity()
            ret = await long_async_call()

            return { "task": "done", **ret }

    **Safely tracing concurrent async calls with decorator**

    This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164)

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        async def get_identity():
            async with aioboto3.client("sts") as sts:
                account = await sts.get_caller_identity()
                return account

        async def long_async_call():
            ...

        @tracer.capture_method
        async def async_tasks():
            _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

            return { "task": "done", **ret }

    **Safely tracing each concurrent async calls with escape hatch**

    This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164)

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        async def get_identity():
            async tracer.provider.in_subsegment_async("## get_identity"):
                ...

        async def long_async_call():
            async tracer.provider.in_subsegment_async("## long_async_call"):
                ...

        @tracer.capture_method
        async def async_tasks():
            _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

            return { "task": "done", **ret }

    Raises
    ------
    err
        Exception raised by method
    """
    # If method is None we've been called with parameters
    # Return a partial function with args filled
    if method is None:
        logger.debug("Decorator called with parameters")
        return cast(
            AnyCallableT,
            functools.partial(self.capture_method, capture_response=capture_response, capture_error=capture_error),
        )

    # Example: app.ClassA.get_all  # noqa ERA001
    # Valid characters can be found at http://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html
    method_name = sanitize_xray_segment_name(f"{method.__module__}.{method.__qualname__}")

    capture_response = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"),
        choice=capture_response,
    )
    capture_error = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"),
        choice=capture_error,
    )

    # Maintenance: Need a factory/builder here to simplify this now
    if inspect.iscoroutinefunction(method):
        return self._decorate_async_function(
            method=method,
            capture_response=capture_response,
            capture_error=capture_error,
            method_name=method_name,
        )
    elif inspect.isgeneratorfunction(method):
        return self._decorate_generator_function(
            method=method,
            capture_response=capture_response,
            capture_error=capture_error,
            method_name=method_name,
        )
    elif hasattr(method, "__wrapped__") and inspect.isgeneratorfunction(method.__wrapped__):
        return self._decorate_generator_function_with_context_manager(
            method=method,
            capture_response=capture_response,
            capture_error=capture_error,
            method_name=method_name,
        )
    else:
        return self._decorate_sync_function(
            method=method,
            capture_response=capture_response,
            capture_error=capture_error,
            method_name=method_name,
        )

ignore_endpoint

ignore_endpoint(
    hostname: str | None = None,
    urls: list[str] | None = None,
)

If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being requested.

NOTE: If the provider is not xray, nothing will be added to ignore list

Documentation
  • https://github.com/aws/aws-xray-sdk-python#ignoring-httplib-requests
PARAMETER DESCRIPTION
hostname

The hostname is matched using the Python fnmatch library which does Unix glob style matching.

TYPE: (Optional, str) DEFAULT: None

urls

List of urls to ignore. Example tracer.ignore_endpoint(urls=["/ignored-url"])

TYPE: list[str] | None DEFAULT: None

Source code in aws_lambda_powertools/tracing/tracer.py
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
def ignore_endpoint(self, hostname: str | None = None, urls: list[str] | None = None):
    """If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being
    requested.

    > NOTE: If the provider is not xray, nothing will be added to ignore list

    Documentation
    --------------
    - https://github.com/aws/aws-xray-sdk-python#ignoring-httplib-requests

    Parameters
    ----------
    hostname : Optional, str
        The hostname is matched using the Python fnmatch library which does Unix glob style matching.
    urls: Optional, list[str]
        List of urls to ignore. Example `tracer.ignore_endpoint(urls=["/ignored-url"])`
    """
    if not self._is_xray_provider():
        return

    from aws_xray_sdk.ext.httplib import add_ignored  # type: ignore

    add_ignored(hostname=hostname, urls=urls)

patch

patch(modules: Sequence[str] | None = None)

Patch modules for instrumentation.

Patches all supported modules by default if none are given.

PARAMETER DESCRIPTION
modules

List of modules to be patched, optional by default

TYPE: Sequence[str] | None DEFAULT: None

Source code in aws_lambda_powertools/tracing/tracer.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def patch(self, modules: Sequence[str] | None = None):
    """Patch modules for instrumentation.

    Patches all supported modules by default if none are given.

    Parameters
    ----------
    modules : Sequence[str] | None
        List of modules to be patched, optional by default
    """
    if self.disabled:
        logger.debug("Tracing has been disabled, aborting patch")
        return

    if modules is None:
        self.provider.patch_all()
    else:
        self.provider.patch(modules)

put_annotation

put_annotation(key: str, value: str | Number | bool)

Adds annotation to existing segment or subsegment

PARAMETER DESCRIPTION
key

Annotation key

TYPE: str

value

Value for annotation

TYPE: str | Number | bool

Example

Custom annotation for a pseudo service named payment

1
2
tracer = Tracer(service="payment")
tracer.put_annotation("PaymentStatus", "CONFIRMED")
Source code in aws_lambda_powertools/tracing/tracer.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def put_annotation(self, key: str, value: str | numbers.Number | bool):
    """Adds annotation to existing segment or subsegment

    Parameters
    ----------
    key : str
        Annotation key
    value : str | numbers.Number | bool
        Value for annotation

    Example
    -------
    Custom annotation for a pseudo service named payment

        tracer = Tracer(service="payment")
        tracer.put_annotation("PaymentStatus", "CONFIRMED")
    """
    if self.disabled:
        logger.debug("Tracing has been disabled, aborting put_annotation")
        return

    logger.debug(f"Annotating on key '{key}' with '{value}'")
    self.provider.put_annotation(key=key, value=value)

put_metadata

put_metadata(
    key: str, value: Any, namespace: str | None = None
)

Adds metadata to existing segment or subsegment

PARAMETER DESCRIPTION
key

Metadata key

TYPE: str

value

Value for metadata

TYPE: any

namespace

Namespace that metadata will lie under, by default None

TYPE: str DEFAULT: None

Example

Custom metadata for a pseudo service named payment

1
2
3
tracer = Tracer(service="payment")
response = collect_payment()
tracer.put_metadata("Payment collection", response)
Source code in aws_lambda_powertools/tracing/tracer.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def put_metadata(self, key: str, value: Any, namespace: str | None = None):
    """Adds metadata to existing segment or subsegment

    Parameters
    ----------
    key : str
        Metadata key
    value : any
        Value for metadata
    namespace : str, optional
        Namespace that metadata will lie under, by default None

    Example
    -------
    Custom metadata for a pseudo service named payment

        tracer = Tracer(service="payment")
        response = collect_payment()
        tracer.put_metadata("Payment collection", response)
    """
    if self.disabled:
        logger.debug("Tracing has been disabled, aborting put_metadata")
        return

    namespace = namespace or self.service
    logger.debug(f"Adding metadata on key '{key}' with '{value}' at namespace '{namespace}'")
    self.provider.put_metadata(key=key, value=value, namespace=namespace)