Package aws_lambda_powertools

Top-level package for Lambda Python Powertools.

Expand source code
# -*- coding: utf-8 -*-

"""Top-level package for Lambda Python Powertools."""

from pathlib import Path

from .logging import Logger
from .metrics import Metrics, single_metric
from .package_logger import set_package_logger_handler
from .tracing import Tracer

__author__ = """Amazon Web Services"""
__all__ = [
    "Logger",
    "Metrics",
    "single_metric",
    "Tracer",
]

PACKAGE_PATH = Path(__file__).parent

set_package_logger_handler()

Sub-modules

aws_lambda_powertools.event_handler

Event handler decorators for common Lambda events

aws_lambda_powertools.exceptions

Shared exceptions that don't belong to a single utility

aws_lambda_powertools.logging

Logging utility

aws_lambda_powertools.metrics

CloudWatch Embedded Metric Format utility

aws_lambda_powertools.middleware_factory

Utilities to enhance middlewares

aws_lambda_powertools.package_logger
aws_lambda_powertools.shared
aws_lambda_powertools.tracing

Tracing utility

aws_lambda_powertools.utilities

General utilities for Powertools

Functions

def single_metric(name: str, unit: MetricUnit, value: float, namespace: Optional[str] = None) ‑> Generator[SingleMetric, None, None]

Context manager to simplify creation of a single metric

Example

Creates cold start metric with function_version as dimension

from aws_lambda_powertools import single_metric
from aws_lambda_powertools.metrics import MetricUnit

with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, namespace="ServerlessAirline") as metric:
    metric.add_dimension(name="function_version", value="47")

Same as above but set namespace using environment variable

$ export POWERTOOLS_METRICS_NAMESPACE="ServerlessAirline"

from aws_lambda_powertools import single_metric
from aws_lambda_powertools.metrics import MetricUnit

with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1) as metric:
    metric.add_dimension(name="function_version", value="47")

Parameters

name : str
Metric name
unit : MetricUnit
aws_lambda_powertools.helper.models.MetricUnit
value : float
Metric value
namespace : str
Namespace for metrics

Yields

SingleMetric
SingleMetric class instance

Raises

MetricUnitError
When metric metric isn't supported by CloudWatch
MetricValueError
When metric value isn't a number
SchemaValidationError
When metric object fails EMF schema validation
Expand source code
@contextmanager
def single_metric(
    name: str, unit: MetricUnit, value: float, namespace: Optional[str] = None
) -> Generator[SingleMetric, None, None]:
    """Context manager to simplify creation of a single metric

    Example
    -------
    **Creates cold start metric with function_version as dimension**

        from aws_lambda_powertools import single_metric
        from aws_lambda_powertools.metrics import MetricUnit

        with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, namespace="ServerlessAirline") as metric:
            metric.add_dimension(name="function_version", value="47")

    **Same as above but set namespace using environment variable**

        $ export POWERTOOLS_METRICS_NAMESPACE="ServerlessAirline"

        from aws_lambda_powertools import single_metric
        from aws_lambda_powertools.metrics import MetricUnit

        with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1) as metric:
            metric.add_dimension(name="function_version", value="47")

    Parameters
    ----------
    name : str
        Metric name
    unit : MetricUnit
        `aws_lambda_powertools.helper.models.MetricUnit`
    value : float
        Metric value
    namespace: str
        Namespace for metrics

    Yields
    -------
    SingleMetric
        SingleMetric class instance

    Raises
    ------
    MetricUnitError
        When metric metric isn't supported by CloudWatch
    MetricValueError
        When metric value isn't a number
    SchemaValidationError
        When metric object fails EMF schema validation
    """
    metric_set: Optional[Dict] = None
    try:
        metric: SingleMetric = SingleMetric(namespace=namespace)
        metric.add_metric(name=name, unit=unit, value=value)
        yield metric
        metric_set = metric.serialize_metric_set()
    finally:
        print(json.dumps(metric_set, separators=(",", ":")))

Classes

class Logger (service: Optional[str] = None, level: Union[str, int, None] = None, child: bool = False, sampling_rate: Optional[float] = None, stream: Optional[IO[str]] = None, logger_formatter: Optional[~PowertoolsFormatter] = None, logger_handler: Optional[logging.Handler] = None, json_serializer: Optional[Callable[[Dict[~KT, ~VT]], str]] = None, json_deserializer: Optional[Callable[[Union[Dict[~KT, ~VT], str, bool, int, float]], str]] = None, json_default: Optional[Callable[[Any], Any]] = None, datefmt: Optional[str] = None, use_datetime_directive: bool = False, log_record_order: Optional[List[str]] = None, utc: bool = False, use_rfc3339: bool = False, **kwargs)

Creates and setups a logger to format statements in JSON.

Includes service name and any additional key=value into logs It also accepts both service name or level explicitly via env vars

Environment Variables

POWERTOOLS_SERVICE_NAME : str service name LOG_LEVEL: str logging level (e.g. INFO, DEBUG) POWERTOOLS_LOGGER_SAMPLE_RATE: float sampling rate ranging from 0 to 1, 1 being 100% sampling

Parameters

service : str, optional
service name to be appended in logs, by default "service_undefined"
level : str, int optional
logging.level, by default "INFO"
child : bool, optional
create a child Logger named ., False by default
sample_rate : float, optional
sample rate for debug calls within execution context defaults to 0.0
stream : sys.stdout, optional
valid output for a logging stream, by default sys.stdout
logger_formatter : PowertoolsFormatter, optional
custom logging formatter that implements PowertoolsFormatter
logger_handler : logging.Handler, optional
custom logging handler e.g. logging.FileHandler("file.log")

Parameters Propagated To Lambdapowertoolsformatter

datefmt: str, optional String directives (strftime) to format log timestamp using time, by default it uses 2021-05-03 11:47:12,494+0200. # noqa: E501 use_datetime_directive: bool, optional Interpret datefmt as a format string for datetime.datetime.strftime, rather than time.strftime.

See <https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior> . This
also supports a custom %F directive for milliseconds.

use_rfc3339: bool, optional Whether to use a popular date format that complies with both RFC3339 and ISO8601. e.g., 2022-10-27T16:27:43.738+02:00. json_serializer : Callable, optional function to serialize obj to a JSON formatted str, by default json.dumps json_deserializer : Callable, optional function to deserialize str, bytes, bytearray containing a JSON document to a Python obj`, by default json.loads json_default : Callable, optional function to coerce unserializable values, by default str()

Only used when no custom formatter is set

utc : bool, optional set logging timestamp to UTC, by default False to continue to use local time as per stdlib log_record_order : list, optional set order of log keys when logging, by default ["level", "location", "message", "timestamp"]

Example

Setups structured logging in JSON for Lambda functions with explicit service name

>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment")
>>>
>>> def handler(event, context):
        logger.info("Hello")

Setups structured logging in JSON for Lambda functions using env vars

$ export POWERTOOLS_SERVICE_NAME="payment"
$ export POWERTOOLS_LOGGER_SAMPLE_RATE=0.01 # 1% debug sampling
>>> from aws_lambda_powertools import Logger
>>> logger = Logger()
>>>
>>> def handler(event, context):
        logger.info("Hello")

Append payment_id to previously setup logger

>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment")
>>>
>>> def handler(event, context):
        logger.append_keys(payment_id=event["payment_id"])
        logger.info("Hello")

Create child Logger using logging inheritance via child param

>>> # app.py
>>> import another_file
>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment")
>>>
>>> # another_file.py
>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment", child=True)

Logging in UTC timezone

>>> # app.py
>>> import logging
>>> from aws_lambda_powertools import Logger
>>>
>>> logger = Logger(service="payment", utc=True)

Brings message as the first key in log statements

>>> # app.py
>>> import logging
>>> from aws_lambda_powertools import Logger
>>>
>>> logger = Logger(service="payment", log_record_order=["message"])

Logging to a file instead of standard output for testing

>>> # app.py
>>> import logging
>>> from aws_lambda_powertools import Logger
>>>
>>> logger = Logger(service="payment", logger_handler=logging.FileHandler("log.json"))

Raises

InvalidLoggerSamplingRateError
When sampling rate provided is not a float

Initialize the logger with a name and an optional level.

Expand source code
class Logger(logging.Logger):  # lgtm [py/missing-call-to-init]
    """Creates and setups a logger to format statements in JSON.

    Includes service name and any additional key=value into logs
    It also accepts both service name or level explicitly via env vars

    Environment variables
    ---------------------
    POWERTOOLS_SERVICE_NAME : str
        service name
    LOG_LEVEL: str
        logging level (e.g. INFO, DEBUG)
    POWERTOOLS_LOGGER_SAMPLE_RATE: float
        sampling rate ranging from 0 to 1, 1 being 100% sampling

    Parameters
    ----------
    service : str, optional
        service name to be appended in logs, by default "service_undefined"
    level : str, int optional
        logging.level, by default "INFO"
    child: bool, optional
        create a child Logger named <service>.<caller_file_name>, False by default
    sample_rate: float, optional
        sample rate for debug calls within execution context defaults to 0.0
    stream: sys.stdout, optional
        valid output for a logging stream, by default sys.stdout
    logger_formatter: PowertoolsFormatter, optional
        custom logging formatter that implements PowertoolsFormatter
    logger_handler: logging.Handler, optional
        custom logging handler e.g. logging.FileHandler("file.log")

    Parameters propagated to LambdaPowertoolsFormatter
    --------------------------------------------------
    datefmt: str, optional
        String directives (strftime) to format log timestamp using `time`, by default it uses 2021-05-03 11:47:12,494+0200. # noqa: E501
    use_datetime_directive: bool, optional
        Interpret `datefmt` as a format string for `datetime.datetime.strftime`, rather than
        `time.strftime`.

        See https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior . This
        also supports a custom %F directive for milliseconds.
    use_rfc3339: bool, optional
        Whether to use a popular date format that complies with both RFC3339 and ISO8601.
        e.g., 2022-10-27T16:27:43.738+02:00.
    json_serializer : Callable, optional
        function to serialize `obj` to a JSON formatted `str`, by default json.dumps
    json_deserializer : Callable, optional
        function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
        by default json.loads
    json_default : Callable, optional
        function to coerce unserializable values, by default `str()`

        Only used when no custom formatter is set
    utc : bool, optional
        set logging timestamp to UTC, by default False to continue to use local time as per stdlib
    log_record_order : list, optional
        set order of log keys when logging, by default ["level", "location", "message", "timestamp"]

    Example
    -------
    **Setups structured logging in JSON for Lambda functions with explicit service name**

        >>> from aws_lambda_powertools import Logger
        >>> logger = Logger(service="payment")
        >>>
        >>> def handler(event, context):
                logger.info("Hello")

    **Setups structured logging in JSON for Lambda functions using env vars**

        $ export POWERTOOLS_SERVICE_NAME="payment"
        $ export POWERTOOLS_LOGGER_SAMPLE_RATE=0.01 # 1% debug sampling
        >>> from aws_lambda_powertools import Logger
        >>> logger = Logger()
        >>>
        >>> def handler(event, context):
                logger.info("Hello")

    **Append payment_id to previously setup logger**

        >>> from aws_lambda_powertools import Logger
        >>> logger = Logger(service="payment")
        >>>
        >>> def handler(event, context):
                logger.append_keys(payment_id=event["payment_id"])
                logger.info("Hello")

    **Create child Logger using logging inheritance via child param**

        >>> # app.py
        >>> import another_file
        >>> from aws_lambda_powertools import Logger
        >>> logger = Logger(service="payment")
        >>>
        >>> # another_file.py
        >>> from aws_lambda_powertools import Logger
        >>> logger = Logger(service="payment", child=True)

    **Logging in UTC timezone**

        >>> # app.py
        >>> import logging
        >>> from aws_lambda_powertools import Logger
        >>>
        >>> logger = Logger(service="payment", utc=True)

    **Brings message as the first key in log statements**

        >>> # app.py
        >>> import logging
        >>> from aws_lambda_powertools import Logger
        >>>
        >>> logger = Logger(service="payment", log_record_order=["message"])

    **Logging to a file instead of standard output for testing**

        >>> # app.py
        >>> import logging
        >>> from aws_lambda_powertools import Logger
        >>>
        >>> logger = Logger(service="payment", logger_handler=logging.FileHandler("log.json"))

    Raises
    ------
    InvalidLoggerSamplingRateError
        When sampling rate provided is not a float
    """

    def __init__(
        self,
        service: Optional[str] = None,
        level: Union[str, int, None] = None,
        child: bool = False,
        sampling_rate: Optional[float] = None,
        stream: Optional[IO[str]] = None,
        logger_formatter: Optional[PowertoolsFormatter] = None,
        logger_handler: Optional[logging.Handler] = None,
        json_serializer: Optional[Callable[[Dict], str]] = None,
        json_deserializer: Optional[Callable[[Union[Dict, str, bool, int, float]], str]] = None,
        json_default: Optional[Callable[[Any], Any]] = None,
        datefmt: Optional[str] = None,
        use_datetime_directive: bool = False,
        log_record_order: Optional[List[str]] = None,
        utc: bool = False,
        use_rfc3339: bool = False,
        **kwargs,
    ):
        self.service = resolve_env_var_choice(
            choice=service, env=os.getenv(constants.SERVICE_NAME_ENV, "service_undefined")
        )
        self.sampling_rate = resolve_env_var_choice(
            choice=sampling_rate, env=os.getenv(constants.LOGGER_LOG_SAMPLING_RATE)
        )
        self.child = child
        self.logger_formatter = logger_formatter
        self.logger_handler = logger_handler or logging.StreamHandler(stream)
        self.log_level = self._get_log_level(level)
        self._is_deduplication_disabled = resolve_truthy_env_var_choice(
            env=os.getenv(constants.LOGGER_LOG_DEDUPLICATION_ENV, "false")
        )
        self._default_log_keys = {"service": self.service, "sampling_rate": self.sampling_rate}
        self._logger = self._get_logger()

        # NOTE: This is primarily to improve UX, so IDEs can autocomplete LambdaPowertoolsFormatter options
        # previously, we masked all of them as kwargs thus limiting feature discovery
        formatter_options = {
            "json_serializer": json_serializer,
            "json_deserializer": json_deserializer,
            "json_default": json_default,
            "datefmt": datefmt,
            "use_datetime_directive": use_datetime_directive,
            "log_record_order": log_record_order,
            "utc": utc,
            "use_rfc3339": use_rfc3339,
        }

        self._init_logger(formatter_options=formatter_options, **kwargs)

    # Prevent __getattr__ from shielding unknown attribute errors in type checkers
    # https://github.com/awslabs/aws-lambda-powertools-python/issues/1660
    if not TYPE_CHECKING:

        def __getattr__(self, name):
            # Proxy attributes not found to actual logger to support backward compatibility
            # https://github.com/awslabs/aws-lambda-powertools-python/issues/97
            return getattr(self._logger, name)

    def _get_logger(self):
        """Returns a Logger named {self.service}, or {self.service.filename} for child loggers"""
        logger_name = self.service
        if self.child:
            logger_name = f"{self.service}.{self._get_caller_filename()}"

        return logging.getLogger(logger_name)

    def _init_logger(self, formatter_options: Optional[Dict] = None, **kwargs):
        """Configures new logger"""

        # Skip configuration if it's a child logger or a pre-configured logger
        # to prevent the following:
        #   a) multiple handlers being attached
        #   b) different sampling mechanisms
        #   c) multiple messages from being logged as handlers can be duplicated
        is_logger_preconfigured = getattr(self._logger, "init", False)
        if self.child or is_logger_preconfigured:
            return

        self._configure_sampling()
        self._logger.setLevel(self.log_level)
        self._logger.addHandler(self.logger_handler)
        self.structure_logs(formatter_options=formatter_options, **kwargs)

        # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work
        self._logger.findCaller = self.findCaller

        # Pytest Live Log feature duplicates log records for colored output
        # but we explicitly add a filter for log deduplication.
        # This flag disables this protection when you explicit want logs to be duplicated (#262)
        if not self._is_deduplication_disabled:
            logger.debug("Adding filter in root logger to suppress child logger records to bubble up")
            for handler in logging.root.handlers:
                # It'll add a filter to suppress any child logger from self.service
                # Example: `Logger(service="order")`, where service is Order
                # It'll reject all loggers starting with `order` e.g. order.checkout, order.shared
                handler.addFilter(SuppressFilter(self.service))

        # as per bug in #249, we should not be pre-configuring an existing logger
        # therefore we set a custom attribute in the Logger that will be returned
        # std logging will return the same Logger with our attribute if name is reused
        logger.debug(f"Marking logger {self.service} as preconfigured")
        self._logger.init = True

    def _configure_sampling(self):
        """Dynamically set log level based on sampling rate

        Raises
        ------
        InvalidLoggerSamplingRateError
            When sampling rate provided is not a float
        """
        try:
            if self.sampling_rate and random.random() <= float(self.sampling_rate):
                logger.debug("Setting log level to Debug due to sampling rate")
                self.log_level = logging.DEBUG
        except ValueError:
            raise InvalidLoggerSamplingRateError(
                f"Expected a float value ranging 0 to 1, but received {self.sampling_rate} instead."
                f"Please review POWERTOOLS_LOGGER_SAMPLE_RATE environment variable."
            )

    def inject_lambda_context(
        self,
        lambda_handler: Optional[Callable[[Dict, Any], Any]] = None,
        log_event: Optional[bool] = None,
        correlation_id_path: Optional[str] = None,
        clear_state: Optional[bool] = False,
    ):
        """Decorator to capture Lambda contextual info and inject into logger

        Parameters
        ----------
        clear_state : bool, optional
            Instructs logger to remove any custom keys previously added
        lambda_handler : Callable
            Method to inject the lambda context
        log_event : bool, optional
            Instructs logger to log Lambda Event, by default False
        correlation_id_path: str, optional
            Optional JMESPath for the correlation_id

        Environment variables
        ---------------------
        POWERTOOLS_LOGGER_LOG_EVENT : str
            instruct logger to log Lambda Event (e.g. `"true", "True", "TRUE"`)

        Example
        -------
        **Captures Lambda contextual runtime info (e.g memory, arn, req_id)**

            from aws_lambda_powertools import Logger

            logger = Logger(service="payment")

            @logger.inject_lambda_context
            def handler(event, context):
                logger.info("Hello")

        **Captures Lambda contextual runtime info and logs incoming request**

            from aws_lambda_powertools import Logger

            logger = Logger(service="payment")

            @logger.inject_lambda_context(log_event=True)
            def handler(event, context):
                logger.info("Hello")

        Returns
        -------
        decorate : Callable
            Decorated lambda handler
        """

        # If handler is None we've been called with parameters
        # Return a partial function with args filled
        if lambda_handler is None:
            logger.debug("Decorator called with parameters")
            return functools.partial(
                self.inject_lambda_context,
                log_event=log_event,
                correlation_id_path=correlation_id_path,
                clear_state=clear_state,
            )

        log_event = resolve_truthy_env_var_choice(
            env=os.getenv(constants.LOGGER_LOG_EVENT_ENV, "false"), choice=log_event
        )

        @functools.wraps(lambda_handler)
        def decorate(event, context, *args, **kwargs):
            lambda_context = build_lambda_context_model(context)
            cold_start = _is_cold_start()

            if clear_state:
                self.structure_logs(cold_start=cold_start, **lambda_context.__dict__)
            else:
                self.append_keys(cold_start=cold_start, **lambda_context.__dict__)

            if correlation_id_path:
                self.set_correlation_id(jmespath.search(correlation_id_path, event))

            if log_event:
                logger.debug("Event received")
                self.info(getattr(event, "raw_event", event))

            return lambda_handler(event, context, *args, **kwargs)

        return decorate

    def info(
        self,
        msg: object,
        *args,
        exc_info=None,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs,
    ):
        extra = extra or {}
        extra = {**extra, **kwargs}

        # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work
        if sys.version_info < (3, 8):  # pragma: no cover
            return self._logger.info(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra)
        return self._logger.info(
            msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra
        )

    def error(
        self,
        msg: object,
        *args,
        exc_info=None,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs,
    ):
        extra = extra or {}
        extra = {**extra, **kwargs}

        # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work
        if sys.version_info < (3, 8):  # pragma: no cover
            return self._logger.error(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra)
        return self._logger.error(
            msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra
        )

    def exception(
        self,
        msg: object,
        *args,
        exc_info=True,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs,
    ):
        extra = extra or {}
        extra = {**extra, **kwargs}

        # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work
        if sys.version_info < (3, 8):  # pragma: no cover
            return self._logger.exception(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra)
        return self._logger.exception(
            msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra
        )

    def critical(
        self,
        msg: object,
        *args,
        exc_info=None,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs,
    ):
        extra = extra or {}
        extra = {**extra, **kwargs}

        # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work
        if sys.version_info < (3, 8):  # pragma: no cover
            return self._logger.critical(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra)
        return self._logger.critical(
            msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra
        )

    def warning(
        self,
        msg: object,
        *args,
        exc_info=None,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs,
    ):
        extra = extra or {}
        extra = {**extra, **kwargs}

        # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work
        if sys.version_info < (3, 8):  # pragma: no cover
            return self._logger.warning(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra)
        return self._logger.warning(
            msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra
        )

    def debug(
        self,
        msg: object,
        *args,
        exc_info=None,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs,
    ):
        extra = extra or {}
        extra = {**extra, **kwargs}

        # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work
        if sys.version_info < (3, 8):  # pragma: no cover
            return self._logger.debug(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra)
        return self._logger.debug(
            msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra
        )

    def append_keys(self, **additional_keys):
        self.registered_formatter.append_keys(**additional_keys)

    def remove_keys(self, keys: Iterable[str]):
        self.registered_formatter.remove_keys(keys)

    @property
    def registered_handler(self) -> logging.Handler:
        """Convenience property to access logger handler"""
        handlers = self._logger.parent.handlers if self.child else self._logger.handlers
        return handlers[0]

    @property
    def registered_formatter(self) -> BasePowertoolsFormatter:
        """Convenience property to access logger formatter"""
        return self.registered_handler.formatter  # type: ignore

    def structure_logs(self, append: bool = False, formatter_options: Optional[Dict] = None, **keys):
        """Sets logging formatting to JSON.

        Optionally, it can append keyword arguments
        to an existing logger, so it is available across future log statements.

        Last keyword argument and value wins if duplicated.

        Parameters
        ----------
        append : bool, optional
            append keys provided to logger formatter, by default False
        formatter_options : dict, optional
            LambdaPowertoolsFormatter options to be propagated, by default {}
        """
        formatter_options = formatter_options or {}

        # There are 3 operational modes for this method
        ## 1. Register a Powertools Formatter for the first time
        ## 2. Append new keys to the current logger formatter; deprecated in favour of append_keys
        ## 3. Add new keys and discard existing to the registered formatter

        # Mode 1
        log_keys = {**self._default_log_keys, **keys}
        is_logger_preconfigured = getattr(self._logger, "init", False)
        if not is_logger_preconfigured:
            formatter = self.logger_formatter or LambdaPowertoolsFormatter(**formatter_options, **log_keys)  # type: ignore # noqa: E501
            self.registered_handler.setFormatter(formatter)

            # when using a custom Lambda Powertools Formatter
            # standard and custom keys that are not Powertools Formatter parameters should be appended
            # and custom keys that might happen to be Powertools Formatter parameters should be discarded
            # this prevents adding them as custom keys, for example, `json_default=<callable>`
            # see https://github.com/awslabs/aws-lambda-powertools-python/issues/1263
            custom_keys = {k: v for k, v in log_keys.items() if k not in RESERVED_FORMATTER_CUSTOM_KEYS}
            return self.registered_formatter.append_keys(**custom_keys)

        # Mode 2 (legacy)
        if append:
            # Maintenance: Add deprecation warning for major version
            return self.append_keys(**keys)

        # Mode 3
        self.registered_formatter.clear_state()
        self.registered_formatter.append_keys(**log_keys)

    def set_correlation_id(self, value: Optional[str]):
        """Sets the correlation_id in the logging json

        Parameters
        ----------
        value : str, optional
            Value for the correlation id. None will remove the correlation_id
        """
        self.append_keys(correlation_id=value)

    def get_correlation_id(self) -> Optional[str]:
        """Gets the correlation_id in the logging json

        Returns
        -------
        str, optional
            Value for the correlation id
        """
        if isinstance(self.registered_formatter, LambdaPowertoolsFormatter):
            return self.registered_formatter.log_format.get("correlation_id")
        return None

    @staticmethod
    def _get_log_level(level: Union[str, int, None]) -> Union[str, int]:
        """Returns preferred log level set by the customer in upper case"""
        if isinstance(level, int):
            return level

        log_level: Optional[str] = level or os.getenv("LOG_LEVEL")
        if log_level is None:
            return logging.INFO

        return log_level.upper()

    @staticmethod
    def _get_caller_filename():
        """Return caller filename by finding the caller frame"""
        # Current frame         => _get_logger()
        # Previous frame        => logger.py
        # Before previous frame => Caller
        frame = inspect.currentframe()
        caller_frame = frame.f_back.f_back.f_back
        return caller_frame.f_globals["__name__"]

    # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work
    def findCaller(self, stack_info=False, stacklevel=2):  # pragma: no cover
        """
        Find the stack frame of the caller so that we can note the source
        file name, line number and function name.
        """
        f = logging.currentframe()  # noqa: VNE001
        # On some versions of IronPython, currentframe() returns None if
        # IronPython isn't run with -X:Frames.
        if f is None:
            return "(unknown file)", 0, "(unknown function)", None
        while stacklevel > 0:
            next_f = f.f_back
            if next_f is None:
                ## We've got options here.
                ## If we want to use the last (deepest) frame:
                break
                ## If we want to mimic the warnings module:
                # return ("sys", 1, "(unknown function)", None) # noqa: E800
                ## If we want to be pedantic:  # noqa: E800
                # raise ValueError("call stack is not deep enough") # noqa: E800
            f = next_f  # noqa: VNE001
            if not _is_internal_frame(f):
                stacklevel -= 1
        co = f.f_code
        sinfo = None
        if stack_info:
            with io.StringIO() as sio:
                sio.write("Stack (most recent call last):\n")
                traceback.print_stack(f, file=sio)
                sinfo = sio.getvalue()
                if sinfo[-1] == "\n":
                    sinfo = sinfo[:-1]
        return co.co_filename, f.f_lineno, co.co_name, sinfo

Ancestors

Methods

def append_keys(self, **additional_keys)
Expand source code
def append_keys(self, **additional_keys):
    self.registered_formatter.append_keys(**additional_keys)
def remove_keys(self, keys: Iterable[str])
Expand source code
def remove_keys(self, keys: Iterable[str]):
    self.registered_formatter.remove_keys(keys)

Inherited members

class Metrics (service: Optional[str] = None, namespace: Optional[str] = None)

Metrics create an EMF object with up to 100 metrics

Use Metrics when you need to create multiple metrics that have dimensions in common (e.g. service_name="payment").

Metrics up to 100 metrics in memory and are shared across all its instances. That means it can be safely instantiated outside of a Lambda function, or anywhere else.

A decorator (log_metrics) is provided so metrics are published at the end of its execution. If more than 100 metrics are added at a given function execution, these metrics are serialized and published before adding a given metric to prevent metric truncation.

Example

Creates a few metrics and publish at the end of a function execution

from aws_lambda_powertools import Metrics

metrics = Metrics(namespace="ServerlessAirline", service="payment")

@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler():
    metrics.add_metric(name="BookingConfirmation", unit="Count", value=1)
    metrics.add_dimension(name="function_version", value="$LATEST")

    return True

Environment Variables

POWERTOOLS_METRICS_NAMESPACE : str metric namespace POWERTOOLS_SERVICE_NAME : str service name used for default dimension

Parameters

service : str, optional
service name to be used as metric dimension, by default "service_undefined"
namespace : str, optional
Namespace for metrics

Raises

MetricUnitError
When metric metric isn't supported by CloudWatch
MetricValueError
When metric value isn't a number
SchemaValidationError
When metric object fails EMF schema validation
Expand source code
class Metrics(MetricManager):
    """Metrics create an EMF object with up to 100 metrics

    Use Metrics when you need to create multiple metrics that have
    dimensions in common (e.g. service_name="payment").

    Metrics up to 100 metrics in memory and are shared across
    all its instances. That means it can be safely instantiated outside
    of a Lambda function, or anywhere else.

    A decorator (log_metrics) is provided so metrics are published at the end of its execution.
    If more than 100 metrics are added at a given function execution,
    these metrics are serialized and published before adding a given metric
    to prevent metric truncation.

    Example
    -------
    **Creates a few metrics and publish at the end of a function execution**

        from aws_lambda_powertools import Metrics

        metrics = Metrics(namespace="ServerlessAirline", service="payment")

        @metrics.log_metrics(capture_cold_start_metric=True)
        def lambda_handler():
            metrics.add_metric(name="BookingConfirmation", unit="Count", value=1)
            metrics.add_dimension(name="function_version", value="$LATEST")

            return True

    Environment variables
    ---------------------
    POWERTOOLS_METRICS_NAMESPACE : str
        metric namespace
    POWERTOOLS_SERVICE_NAME : str
        service name used for default dimension

    Parameters
    ----------
    service : str, optional
        service name to be used as metric dimension, by default "service_undefined"
    namespace : str, optional
        Namespace for metrics

    Raises
    ------
    MetricUnitError
        When metric metric isn't supported by CloudWatch
    MetricValueError
        When metric value isn't a number
    SchemaValidationError
        When metric object fails EMF schema validation
    """

    _metrics: Dict[str, Any] = {}
    _dimensions: Dict[str, str] = {}
    _metadata: Dict[str, Any] = {}
    _default_dimensions: Dict[str, Any] = {}

    def __init__(self, service: Optional[str] = None, namespace: Optional[str] = None):
        self.metric_set = self._metrics
        self.service = service
        self.namespace: Optional[str] = namespace
        self.metadata_set = self._metadata
        self.default_dimensions = self._default_dimensions
        self.dimension_set = self._dimensions
        self.dimension_set.update(**self._default_dimensions)

        super().__init__(
            metric_set=self.metric_set,
            dimension_set=self.dimension_set,
            namespace=self.namespace,
            metadata_set=self.metadata_set,
            service=self.service,
        )

    def set_default_dimensions(self, **dimensions) -> None:
        """Persist dimensions across Lambda invocations

        Parameters
        ----------
        dimensions : Dict[str, Any], optional
            metric dimensions as key=value

        Example
        -------
        **Sets some default dimensions that will always be present across metrics and invocations**

            from aws_lambda_powertools import Metrics

            metrics = Metrics(namespace="ServerlessAirline", service="payment")
            metrics.set_default_dimensions(environment="demo", another="one")

            @metrics.log_metrics()
            def lambda_handler():
                   return True
        """
        for name, value in dimensions.items():
            self.add_dimension(name, value)

        self.default_dimensions.update(**dimensions)

    def clear_default_dimensions(self) -> None:
        self.default_dimensions.clear()

    def clear_metrics(self) -> None:
        logger.debug("Clearing out existing metric set from memory")
        self.metric_set.clear()
        self.dimension_set.clear()
        self.metadata_set.clear()
        self.set_default_dimensions(**self.default_dimensions)  # re-add default dimensions

    def log_metrics(
        self,
        lambda_handler: Union[Callable[[Dict, Any], Any], Optional[Callable[[Dict, Any, Optional[Dict]], Any]]] = None,
        capture_cold_start_metric: bool = False,
        raise_on_empty_metrics: bool = False,
        default_dimensions: Optional[Dict[str, str]] = None,
    ):
        """Decorator to serialize and publish metrics at the end of a function execution.

        Be aware that the log_metrics **does call* the decorated function (e.g. lambda_handler).

        Example
        -------
        **Lambda function using tracer and metrics decorators**

            from aws_lambda_powertools import Metrics, Tracer

            metrics = Metrics(service="payment")
            tracer = Tracer(service="payment")

            @tracer.capture_lambda_handler
            @metrics.log_metrics
            def handler(event, context):
                    ...

        Parameters
        ----------
        lambda_handler : Callable[[Any, Any], Any], optional
            lambda function handler, by default None
        capture_cold_start_metric : bool, optional
            captures cold start metric, by default False
        raise_on_empty_metrics : bool, optional
            raise exception if no metrics are emitted, by default False
        default_dimensions: Dict[str, str], optional
            metric dimensions as key=value that will always be present

        Raises
        ------
        e
            Propagate error received
        """

        # If handler is None we've been called with parameters
        # Return a partial function with args filled
        if lambda_handler is None:
            logger.debug("Decorator called with parameters")
            return functools.partial(
                self.log_metrics,
                capture_cold_start_metric=capture_cold_start_metric,
                raise_on_empty_metrics=raise_on_empty_metrics,
                default_dimensions=default_dimensions,
            )

        @functools.wraps(lambda_handler)
        def decorate(event, context):
            try:
                if default_dimensions:
                    self.set_default_dimensions(**default_dimensions)
                response = lambda_handler(event, context)
                if capture_cold_start_metric:
                    self.__add_cold_start_metric(context=context)
            finally:
                if not raise_on_empty_metrics and not self.metric_set:
                    warnings.warn("No metrics to publish, skipping")
                else:
                    metrics = self.serialize_metric_set()
                    self.clear_metrics()
                    print(json.dumps(metrics, separators=(",", ":")))

            return response

        return decorate

    def __add_cold_start_metric(self, context: Any) -> None:
        """Add cold start metric and function_name dimension

        Parameters
        ----------
        context : Any
            Lambda context
        """
        global is_cold_start
        if is_cold_start:
            logger.debug("Adding cold start metric and function_name dimension")
            with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, namespace=self.namespace) as metric:
                metric.add_dimension(name="function_name", value=context.function_name)
                if self.service:
                    metric.add_dimension(name="service", value=str(self.service))
                is_cold_start = False

Ancestors

Methods

def clear_default_dimensions(self) ‑> None
Expand source code
def clear_default_dimensions(self) -> None:
    self.default_dimensions.clear()
def clear_metrics(self) ‑> None
Expand source code
def clear_metrics(self) -> None:
    logger.debug("Clearing out existing metric set from memory")
    self.metric_set.clear()
    self.dimension_set.clear()
    self.metadata_set.clear()
    self.set_default_dimensions(**self.default_dimensions)  # re-add default dimensions
def log_metrics(self, lambda_handler: Union[Callable[[Dict[~KT, ~VT], Any], Any], Callable[[Dict[~KT, ~VT], Any, Optional[Dict[~KT, ~VT]]], Any], None] = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, default_dimensions: Optional[Dict[str, str]] = None)

Decorator to serialize and publish metrics at the end of a function execution.

Be aware that the log_metrics *does call the decorated function (e.g. lambda_handler).

Example

Lambda function using tracer and metrics decorators

from aws_lambda_powertools import Metrics, Tracer

metrics = Metrics(service="payment")
tracer = Tracer(service="payment")

@tracer.capture_lambda_handler
@metrics.log_metrics
def handler(event, context):
        ...

Parameters

lambda_handler : Callable[[Any, Any], Any], optional
lambda function handler, by default None
capture_cold_start_metric : bool, optional
captures cold start metric, by default False
raise_on_empty_metrics : bool, optional
raise exception if no metrics are emitted, by default False
default_dimensions : Dict[str, str], optional
metric dimensions as key=value that will always be present

Raises

e
Propagate error received
Expand source code
def log_metrics(
    self,
    lambda_handler: Union[Callable[[Dict, Any], Any], Optional[Callable[[Dict, Any, Optional[Dict]], Any]]] = None,
    capture_cold_start_metric: bool = False,
    raise_on_empty_metrics: bool = False,
    default_dimensions: Optional[Dict[str, str]] = None,
):
    """Decorator to serialize and publish metrics at the end of a function execution.

    Be aware that the log_metrics **does call* the decorated function (e.g. lambda_handler).

    Example
    -------
    **Lambda function using tracer and metrics decorators**

        from aws_lambda_powertools import Metrics, Tracer

        metrics = Metrics(service="payment")
        tracer = Tracer(service="payment")

        @tracer.capture_lambda_handler
        @metrics.log_metrics
        def handler(event, context):
                ...

    Parameters
    ----------
    lambda_handler : Callable[[Any, Any], Any], optional
        lambda function handler, by default None
    capture_cold_start_metric : bool, optional
        captures cold start metric, by default False
    raise_on_empty_metrics : bool, optional
        raise exception if no metrics are emitted, by default False
    default_dimensions: Dict[str, str], optional
        metric dimensions as key=value that will always be present

    Raises
    ------
    e
        Propagate error received
    """

    # If handler is None we've been called with parameters
    # Return a partial function with args filled
    if lambda_handler is None:
        logger.debug("Decorator called with parameters")
        return functools.partial(
            self.log_metrics,
            capture_cold_start_metric=capture_cold_start_metric,
            raise_on_empty_metrics=raise_on_empty_metrics,
            default_dimensions=default_dimensions,
        )

    @functools.wraps(lambda_handler)
    def decorate(event, context):
        try:
            if default_dimensions:
                self.set_default_dimensions(**default_dimensions)
            response = lambda_handler(event, context)
            if capture_cold_start_metric:
                self.__add_cold_start_metric(context=context)
        finally:
            if not raise_on_empty_metrics and not self.metric_set:
                warnings.warn("No metrics to publish, skipping")
            else:
                metrics = self.serialize_metric_set()
                self.clear_metrics()
                print(json.dumps(metrics, separators=(",", ":")))

        return response

    return decorate
def set_default_dimensions(self, **dimensions) ‑> None

Persist dimensions across Lambda invocations

Parameters

dimensions : Dict[str, Any], optional
metric dimensions as key=value

Example

Sets some default dimensions that will always be present across metrics and invocations

from aws_lambda_powertools import Metrics

metrics = Metrics(namespace="ServerlessAirline", service="payment")
metrics.set_default_dimensions(environment="demo", another="one")

@metrics.log_metrics()
def lambda_handler():
       return True
Expand source code
def set_default_dimensions(self, **dimensions) -> None:
    """Persist dimensions across Lambda invocations

    Parameters
    ----------
    dimensions : Dict[str, Any], optional
        metric dimensions as key=value

    Example
    -------
    **Sets some default dimensions that will always be present across metrics and invocations**

        from aws_lambda_powertools import Metrics

        metrics = Metrics(namespace="ServerlessAirline", service="payment")
        metrics.set_default_dimensions(environment="demo", another="one")

        @metrics.log_metrics()
        def lambda_handler():
               return True
    """
    for name, value in dimensions.items():
        self.add_dimension(name, value)

    self.default_dimensions.update(**dimensions)

Inherited members

class Tracer (service: Optional[str] = None, disabled: Optional[bool] = None, auto_patch: Optional[bool] = None, patch_modules: Optional[Sequence[str]] = None, provider: Optional[BaseProvider] = None)

Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions

When running locally, it detects whether it's running via SAM CLI, and if it is it returns dummy segments/subsegments instead.

By default, it patches all available libraries supported by X-Ray SDK. Patching is automatically disabled when running locally via SAM CLI or by any other means.

Ref: https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/thirdparty.html

Tracer keeps a copy of its configuration as it can be instantiated more than once. This is useful when you are using your own middlewares and want to utilize an existing Tracer. Make sure to set auto_patch=False in subsequent Tracer instances to avoid double patching.

Environment Variables

POWERTOOLS_TRACE_DISABLED : str disable tracer (e.g. "true", "True", "TRUE") POWERTOOLS_SERVICE_NAME : str service name POWERTOOLS_TRACER_CAPTURE_RESPONSE : str disable auto-capture response as metadata (e.g. "true", "True", "TRUE") POWERTOOLS_TRACER_CAPTURE_ERROR : str disable auto-capture error as metadata (e.g. "true", "True", "TRUE")

Parameters

service : str
Service name that will be appended in all tracing metadata
auto_patch : bool
Patch existing imported modules during initialization, by default True
disabled : bool
Flag to explicitly disable tracing, useful when running/testing locally Env POWERTOOLS_TRACE_DISABLED="true"
patch_modules : Optional[Sequence[str]]
Tuple of modules supported by tracing provider to patch, by default all modules are patched
provider : BaseProvider
Tracing provider, by default it is aws_xray_sdk.core.xray_recorder

Returns

Tracer
Tracer instance with imported modules patched

Example

A Lambda function using Tracer

from aws_lambda_powertools import Tracer
tracer = Tracer(service="greeting")

@tracer.capture_method
def greeting(name: str) -> Dict:
    return {
        "name": name
    }

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> Dict:
    print("Received event from Lambda...")
    response = greeting(name="Heitor")
    return response

Booking Lambda function using Tracer that adds additional annotation/metadata

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
def confirm_booking(booking_id: str) -> Dict:
        resp = add_confirmation(booking_id)

        tracer.put_annotation("BookingConfirmation", resp["requestId"])
        tracer.put_metadata("Booking confirmation", resp)

        return resp

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> Dict:
    print("Received event from Lambda...")
    booking_id = event.get("booking_id")
    response = confirm_booking(booking_id=booking_id)
    return response

A Lambda function using service name via POWERTOOLS_SERVICE_NAME

export POWERTOOLS_SERVICE_NAME="booking"
from aws_lambda_powertools import Tracer
tracer = Tracer()

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> Dict:
    print("Received event from Lambda...")
    response = greeting(name="Lessa")
    return response

Reuse an existing instance of Tracer anywhere in the code

# lambda_handler.py
from aws_lambda_powertools import Tracer
tracer = Tracer()

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> Dict:
    ...

# utils.py
from aws_lambda_powertools import Tracer
tracer = Tracer()
...

Limitations

  • Async handler not supported
Expand source code
class Tracer:
    """Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions

    When running locally, it detects whether it's running via SAM CLI,
    and if it is it returns dummy segments/subsegments instead.

    By default, it patches all available libraries supported by X-Ray SDK. Patching is
    automatically disabled when running locally via SAM CLI or by any other means. \n
    Ref: https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/thirdparty.html

    Tracer keeps a copy of its configuration as it can be instantiated more than once. This
    is useful when you are using your own middlewares and want to utilize an existing Tracer.
    Make sure to set `auto_patch=False` in subsequent Tracer instances to avoid double patching.

    Environment variables
    ---------------------
    POWERTOOLS_TRACE_DISABLED : str
        disable tracer (e.g. `"true", "True", "TRUE"`)
    POWERTOOLS_SERVICE_NAME : str
        service name
    POWERTOOLS_TRACER_CAPTURE_RESPONSE : str
        disable auto-capture response as metadata (e.g. `"true", "True", "TRUE"`)
    POWERTOOLS_TRACER_CAPTURE_ERROR : str
        disable auto-capture error as metadata (e.g. `"true", "True", "TRUE"`)

    Parameters
    ----------
    service: str
        Service name that will be appended in all tracing metadata
    auto_patch: bool
        Patch existing imported modules during initialization, by default True
    disabled: bool
        Flag to explicitly disable tracing, useful when running/testing locally
        `Env POWERTOOLS_TRACE_DISABLED="true"`
    patch_modules: Optional[Sequence[str]]
        Tuple of modules supported by tracing provider to patch, by default all modules are patched
    provider: BaseProvider
        Tracing provider, by default it is aws_xray_sdk.core.xray_recorder

    Returns
    -------
    Tracer
        Tracer instance with imported modules patched

    Example
    -------
    **A Lambda function using Tracer**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="greeting")

        @tracer.capture_method
        def greeting(name: str) -> Dict:
            return {
                "name": name
            }

        @tracer.capture_lambda_handler
        def handler(event: dict, context: Any) -> Dict:
            print("Received event from Lambda...")
            response = greeting(name="Heitor")
            return response

    **Booking Lambda function using Tracer that adds additional annotation/metadata**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        def confirm_booking(booking_id: str) -> Dict:
                resp = add_confirmation(booking_id)

                tracer.put_annotation("BookingConfirmation", resp["requestId"])
                tracer.put_metadata("Booking confirmation", resp)

                return resp

        @tracer.capture_lambda_handler
        def handler(event: dict, context: Any) -> Dict:
            print("Received event from Lambda...")
            booking_id = event.get("booking_id")
            response = confirm_booking(booking_id=booking_id)
            return response

    **A Lambda function using service name via POWERTOOLS_SERVICE_NAME**

        export POWERTOOLS_SERVICE_NAME="booking"
        from aws_lambda_powertools import Tracer
        tracer = Tracer()

        @tracer.capture_lambda_handler
        def handler(event: dict, context: Any) -> Dict:
            print("Received event from Lambda...")
            response = greeting(name="Lessa")
            return response

    **Reuse an existing instance of Tracer anywhere in the code**

        # lambda_handler.py
        from aws_lambda_powertools import Tracer
        tracer = Tracer()

        @tracer.capture_lambda_handler
        def handler(event: dict, context: Any) -> Dict:
            ...

        # utils.py
        from aws_lambda_powertools import Tracer
        tracer = Tracer()
        ...

    Limitations
    -----------
    * Async handler not supported
    """

    _default_config: Dict[str, Any] = {
        "service": "",
        "disabled": False,
        "auto_patch": True,
        "patch_modules": None,
        "provider": None,
    }
    _config = copy.copy(_default_config)

    def __init__(
        self,
        service: Optional[str] = None,
        disabled: Optional[bool] = None,
        auto_patch: Optional[bool] = None,
        patch_modules: Optional[Sequence[str]] = None,
        provider: Optional[BaseProvider] = None,
    ):
        self.__build_config(
            service=service, disabled=disabled, auto_patch=auto_patch, patch_modules=patch_modules, provider=provider
        )
        self.provider: BaseProvider = self._config["provider"]
        self.disabled = self._config["disabled"]
        self.service = self._config["service"]
        self.auto_patch = self._config["auto_patch"]

        if self.disabled:
            self._disable_tracer_provider()

        if self.auto_patch:
            self.patch(modules=patch_modules)

        if self._is_xray_provider():
            self._disable_xray_trace_batching()

    def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]):
        """Adds annotation to existing segment or subsegment

        Parameters
        ----------
        key : str
            Annotation key
        value : Union[str, numbers.Number, bool]
            Value for annotation

        Example
        -------
        Custom annotation for a pseudo service named payment

            tracer = Tracer(service="payment")
            tracer.put_annotation("PaymentStatus", "CONFIRMED")
        """
        if self.disabled:
            logger.debug("Tracing has been disabled, aborting put_annotation")
            return

        logger.debug(f"Annotating on key '{key}' with '{value}'")
        self.provider.put_annotation(key=key, value=value)

    def put_metadata(self, key: str, value: Any, namespace: Optional[str] = None):
        """Adds metadata to existing segment or subsegment

        Parameters
        ----------
        key : str
            Metadata key
        value : any
            Value for metadata
        namespace : str, optional
            Namespace that metadata will lie under, by default None

        Example
        -------
        Custom metadata for a pseudo service named payment

            tracer = Tracer(service="payment")
            response = collect_payment()
            tracer.put_metadata("Payment collection", response)
        """
        if self.disabled:
            logger.debug("Tracing has been disabled, aborting put_metadata")
            return

        namespace = namespace or self.service
        logger.debug(f"Adding metadata on key '{key}' with '{value}' at namespace '{namespace}'")
        self.provider.put_metadata(key=key, value=value, namespace=namespace)

    def patch(self, modules: Optional[Sequence[str]] = None):
        """Patch modules for instrumentation.

        Patches all supported modules by default if none are given.

        Parameters
        ----------
        modules : Optional[Sequence[str]]
            List of modules to be patched, optional by default
        """
        if self.disabled:
            logger.debug("Tracing has been disabled, aborting patch")
            return

        if modules is None:
            self.provider.patch_all()
        else:
            self.provider.patch(modules)

    def capture_lambda_handler(
        self,
        lambda_handler: Union[Callable[[Dict, Any], Any], Optional[Callable[[Dict, Any, Optional[Dict]], Any]]] = None,
        capture_response: Optional[bool] = None,
        capture_error: Optional[bool] = None,
    ):
        """Decorator to create subsegment for lambda handlers

        As Lambda follows (event, context) signature we can remove some of the boilerplate
        and also capture any exception any Lambda function throws or its response as metadata

        Parameters
        ----------
        lambda_handler : Callable
            Method to annotate on
        capture_response : bool, optional
            Instructs tracer to not include handler's response as metadata
        capture_error : bool, optional
            Instructs tracer to not include handler's error as metadata, by default True

        Example
        -------
        **Lambda function using capture_lambda_handler decorator**

            tracer = Tracer(service="payment")
            @tracer.capture_lambda_handler
            def handler(event, context):
                ...

        **Preventing Tracer to log response as metadata**

            tracer = Tracer(service="payment")
            @tracer.capture_lambda_handler(capture_response=False)
            def handler(event, context):
                ...

        Raises
        ------
        err
            Exception raised by method
        """
        # If handler is None we've been called with parameters
        # Return a partial function with args filled
        if lambda_handler is None:
            logger.debug("Decorator called with parameters")
            return functools.partial(
                self.capture_lambda_handler, capture_response=capture_response, capture_error=capture_error
            )

        lambda_handler_name = lambda_handler.__name__
        capture_response = resolve_truthy_env_var_choice(
            env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"), choice=capture_response
        )
        capture_error = resolve_truthy_env_var_choice(
            env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"), choice=capture_error
        )

        @functools.wraps(lambda_handler)
        def decorate(event, context, **kwargs):
            with self.provider.in_subsegment(name=f"## {lambda_handler_name}") as subsegment:
                try:
                    logger.debug("Calling lambda handler")
                    response = lambda_handler(event, context, **kwargs)
                    logger.debug("Received lambda handler response successfully")
                    self._add_response_as_metadata(
                        method_name=lambda_handler_name,
                        data=response,
                        subsegment=subsegment,
                        capture_response=capture_response,
                    )
                except Exception as err:
                    logger.exception(f"Exception received from {lambda_handler_name}")
                    self._add_full_exception_as_metadata(
                        method_name=lambda_handler_name, error=err, subsegment=subsegment, capture_error=capture_error
                    )

                    raise
                finally:
                    global is_cold_start
                    logger.debug("Annotating cold start")
                    subsegment.put_annotation(key="ColdStart", value=is_cold_start)

                    if is_cold_start:
                        is_cold_start = False

                    if self.service:
                        subsegment.put_annotation(key="Service", value=self.service)

                return response

        return decorate

    # see #465
    @overload
    def capture_method(self, method: "AnyCallableT") -> "AnyCallableT":
        ...  # pragma: no cover

    @overload
    def capture_method(
        self,
        method: None = None,
        capture_response: Optional[bool] = None,
        capture_error: Optional[bool] = None,
    ) -> Callable[["AnyCallableT"], "AnyCallableT"]:
        ...  # pragma: no cover

    def capture_method(
        self,
        method: Optional[AnyCallableT] = None,
        capture_response: Optional[bool] = None,
        capture_error: Optional[bool] = None,
    ) -> AnyCallableT:
        """Decorator to create subsegment for arbitrary functions

        It also captures both response and exceptions as metadata
        and creates a subsegment named `## <method_module.method_qualifiedname>`
        # see here: [Qualified name for classes and functions](https://peps.python.org/pep-3155/)

        When running [async functions concurrently](https://docs.python.org/3/library/asyncio-task.html#id6),
        methods may impact each others subsegment, and can trigger
        and AlreadyEndedException from X-Ray due to async nature.

        For this use case, either use `capture_method` only where
        `async.gather` is called, or use `in_subsegment_async`
        context manager via our escape hatch mechanism - See examples.

        Parameters
        ----------
        method : Callable
            Method to annotate on
        capture_response : bool, optional
            Instructs tracer to not include method's response as metadata
        capture_error : bool, optional
            Instructs tracer to not include handler's error as metadata, by default True

        Example
        -------
        **Custom function using capture_method decorator**

            tracer = Tracer(service="payment")
            @tracer.capture_method
            def some_function()

        **Custom async method using capture_method decorator**

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            @tracer.capture_method
            async def confirm_booking(booking_id: str) -> Dict:
                resp = call_to_booking_service()

                tracer.put_annotation("BookingConfirmation", resp["requestId"])
                tracer.put_metadata("Booking confirmation", resp)

                return resp

            def lambda_handler(event: dict, context: Any) -> Dict:
                booking_id = event.get("booking_id")
                asyncio.run(confirm_booking(booking_id=booking_id))

        **Custom generator function using capture_method decorator**

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            @tracer.capture_method
            def bookings_generator(booking_id):
                resp = call_to_booking_service()
                yield resp[0]
                yield resp[1]

            def lambda_handler(event: dict, context: Any) -> Dict:
                gen = bookings_generator(booking_id=booking_id)
                result = list(gen)

        **Custom generator context manager using capture_method decorator**

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            @tracer.capture_method
            @contextlib.contextmanager
            def booking_actions(booking_id):
                resp = call_to_booking_service()
                yield "example result"
                cleanup_stuff()

            def lambda_handler(event: dict, context: Any) -> Dict:
                booking_id = event.get("booking_id")

                with booking_actions(booking_id=booking_id) as booking:
                    result = booking

        **Tracing nested async calls**

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            @tracer.capture_method
            async def get_identity():
                ...

            @tracer.capture_method
            async def long_async_call():
                ...

            @tracer.capture_method
            async def async_tasks():
                await get_identity()
                ret = await long_async_call()

                return { "task": "done", **ret }

        **Safely tracing concurrent async calls with decorator**

        This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164)

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            async def get_identity():
                async with aioboto3.client("sts") as sts:
                    account = await sts.get_caller_identity()
                    return account

            async def long_async_call():
                ...

            @tracer.capture_method
            async def async_tasks():
                _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

                return { "task": "done", **ret }

        **Safely tracing each concurrent async calls with escape hatch**

        This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164)

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            async def get_identity():
                async tracer.provider.in_subsegment_async("## get_identity"):
                    ...

            async def long_async_call():
                async tracer.provider.in_subsegment_async("## long_async_call"):
                    ...

            @tracer.capture_method
            async def async_tasks():
                _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

                return { "task": "done", **ret }

        Raises
        ------
        err
            Exception raised by method
        """
        # If method is None we've been called with parameters
        # Return a partial function with args filled
        if method is None:
            logger.debug("Decorator called with parameters")
            return cast(
                AnyCallableT,
                functools.partial(self.capture_method, capture_response=capture_response, capture_error=capture_error),
            )

        # Example: app.ClassA.get_all  # noqa E800
        method_name = f"{method.__module__}.{method.__qualname__}"

        capture_response = resolve_truthy_env_var_choice(
            env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"), choice=capture_response
        )
        capture_error = resolve_truthy_env_var_choice(
            env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"), choice=capture_error
        )

        # Maintenance: Need a factory/builder here to simplify this now
        if inspect.iscoroutinefunction(method):
            return self._decorate_async_function(
                method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name
            )
        elif inspect.isgeneratorfunction(method):
            return self._decorate_generator_function(
                method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name
            )
        elif hasattr(method, "__wrapped__") and inspect.isgeneratorfunction(method.__wrapped__):  # type: ignore
            return self._decorate_generator_function_with_context_manager(
                method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name
            )
        else:
            return self._decorate_sync_function(
                method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name
            )

    def _decorate_async_function(
        self,
        method: Callable,
        capture_response: Optional[Union[bool, str]] = None,
        capture_error: Optional[Union[bool, str]] = None,
        method_name: Optional[str] = None,
    ):
        @functools.wraps(method)
        async def decorate(*args, **kwargs):
            async with self.provider.in_subsegment_async(name=f"## {method_name}") as subsegment:
                try:
                    logger.debug(f"Calling method: {method_name}")
                    response = await method(*args, **kwargs)
                    self._add_response_as_metadata(
                        method_name=method_name, data=response, subsegment=subsegment, capture_response=capture_response
                    )
                except Exception as err:
                    logger.exception(f"Exception received from '{method_name}' method")
                    self._add_full_exception_as_metadata(
                        method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error
                    )
                    raise

                return response

        return decorate

    def _decorate_generator_function(
        self,
        method: Callable,
        capture_response: Optional[Union[bool, str]] = None,
        capture_error: Optional[Union[bool, str]] = None,
        method_name: Optional[str] = None,
    ):
        @functools.wraps(method)
        def decorate(*args, **kwargs):
            with self.provider.in_subsegment(name=f"## {method_name}") as subsegment:
                try:
                    logger.debug(f"Calling method: {method_name}")
                    result = yield from method(*args, **kwargs)
                    self._add_response_as_metadata(
                        method_name=method_name, data=result, subsegment=subsegment, capture_response=capture_response
                    )
                except Exception as err:
                    logger.exception(f"Exception received from '{method_name}' method")
                    self._add_full_exception_as_metadata(
                        method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error
                    )
                    raise

                return result

        return decorate

    def _decorate_generator_function_with_context_manager(
        self,
        method: Callable,
        capture_response: Optional[Union[bool, str]] = None,
        capture_error: Optional[Union[bool, str]] = None,
        method_name: Optional[str] = None,
    ):
        @functools.wraps(method)
        @contextlib.contextmanager
        def decorate(*args, **kwargs):
            with self.provider.in_subsegment(name=f"## {method_name}") as subsegment:
                try:
                    logger.debug(f"Calling method: {method_name}")
                    with method(*args, **kwargs) as return_val:
                        result = return_val
                        yield result
                    self._add_response_as_metadata(
                        method_name=method_name, data=result, subsegment=subsegment, capture_response=capture_response
                    )
                except Exception as err:
                    logger.exception(f"Exception received from '{method_name}' method")
                    self._add_full_exception_as_metadata(
                        method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error
                    )
                    raise

        return decorate

    def _decorate_sync_function(
        self,
        method: AnyCallableT,
        capture_response: Optional[Union[bool, str]] = None,
        capture_error: Optional[Union[bool, str]] = None,
        method_name: Optional[str] = None,
    ) -> AnyCallableT:
        @functools.wraps(method)
        def decorate(*args, **kwargs):
            with self.provider.in_subsegment(name=f"## {method_name}") as subsegment:
                try:
                    logger.debug(f"Calling method: {method_name}")
                    response = method(*args, **kwargs)
                    self._add_response_as_metadata(
                        method_name=method_name,
                        data=response,
                        subsegment=subsegment,
                        capture_response=capture_response,
                    )
                except Exception as err:
                    logger.exception(f"Exception received from '{method_name}' method")
                    self._add_full_exception_as_metadata(
                        method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error
                    )
                    raise

                return response

        return cast(AnyCallableT, decorate)

    def _add_response_as_metadata(
        self,
        method_name: Optional[str] = None,
        data: Optional[Any] = None,
        subsegment: Optional[BaseSegment] = None,
        capture_response: Optional[Union[bool, str]] = None,
    ):
        """Add response as metadata for given subsegment

        Parameters
        ----------
        method_name : str, optional
            method name to add as metadata key, by default None
        data : Any, optional
            data to add as subsegment metadata, by default None
        subsegment : BaseSegment, optional
            existing subsegment to add metadata on, by default None
        capture_response : bool, optional
            Do not include response as metadata
        """
        if data is None or not capture_response or subsegment is None:
            return

        subsegment.put_metadata(key=f"{method_name} response", value=data, namespace=self.service)

    def _add_full_exception_as_metadata(
        self,
        method_name: str,
        error: Exception,
        subsegment: BaseSegment,
        capture_error: Optional[bool] = None,
    ):
        """Add full exception object as metadata for given subsegment

        Parameters
        ----------
        method_name : str
            method name to add as metadata key, by default None
        error : Exception
            error to add as subsegment metadata, by default None
        subsegment : BaseSegment
            existing subsegment to add metadata on, by default None
        capture_error : bool, optional
            Do not include error as metadata, by default True
        """
        if not capture_error:
            return

        subsegment.put_metadata(key=f"{method_name} error", value=error, namespace=self.service)

    @staticmethod
    def _disable_tracer_provider():
        """Forcefully disables tracing"""
        logger.debug("Disabling tracer provider...")
        aws_xray_sdk.global_sdk_config.set_sdk_enabled(False)

    @staticmethod
    def _is_tracer_disabled() -> Union[bool, str]:
        """Detects whether trace has been disabled

        Tracing is automatically disabled in the following conditions:

        1. Explicitly disabled via `TRACE_DISABLED` environment variable
        2. Running in Lambda Emulators, or locally where X-Ray Daemon will not be listening
        3. Explicitly disabled via constructor e.g `Tracer(disabled=True)`

        Returns
        -------
        Union[bool, str]
        """
        logger.debug("Verifying whether Tracing has been disabled")
        is_lambda_env = os.getenv(constants.LAMBDA_TASK_ROOT_ENV)
        is_disabled = resolve_truthy_env_var_choice(env=os.getenv(constants.TRACER_DISABLED_ENV, "false"))

        if is_disabled:
            logger.debug("Tracing has been disabled via env var POWERTOOLS_TRACE_DISABLED")
            return is_disabled

        if not is_lambda_env:
            logger.debug("Running outside Lambda env; disabling Tracing")
            return True

        return False

    def __build_config(
        self,
        service: Optional[str] = None,
        disabled: Optional[bool] = None,
        auto_patch: Optional[bool] = None,
        patch_modules: Optional[Sequence[str]] = None,
        provider: Optional[BaseProvider] = None,
    ):
        """Populates Tracer config for new and existing initializations"""
        is_disabled = disabled if disabled is not None else self._is_tracer_disabled()
        is_service = resolve_env_var_choice(choice=service, env=os.getenv(constants.SERVICE_NAME_ENV))

        # Logic: Choose overridden option first, previously cached config, or default if available
        self._config["provider"] = provider or self._config["provider"] or self._patch_xray_provider()
        self._config["auto_patch"] = auto_patch if auto_patch is not None else self._config["auto_patch"]
        self._config["service"] = is_service or self._config["service"]
        self._config["disabled"] = is_disabled or self._config["disabled"]
        self._config["patch_modules"] = patch_modules or self._config["patch_modules"]

    @classmethod
    def _reset_config(cls):
        cls._config = copy.copy(cls._default_config)

    def _patch_xray_provider(self):
        # Due to Lazy Import, we need to activate `core` attrib via import
        # we also need to include `patch`, `patch_all` methods
        # to ensure patch calls are done via the provider
        from aws_xray_sdk.core import xray_recorder  # type: ignore

        provider = xray_recorder
        provider.patch = aws_xray_sdk.core.patch
        provider.patch_all = aws_xray_sdk.core.patch_all

        return provider

    def _disable_xray_trace_batching(self):
        """Configure X-Ray SDK to send subsegment individually over batching
        Known issue: https://github.com/awslabs/aws-lambda-powertools-python/issues/283
        """
        if self.disabled:
            logger.debug("Tracing has been disabled, aborting streaming override")
            return

        aws_xray_sdk.core.xray_recorder.configure(streaming_threshold=0)

    def _is_xray_provider(self):
        return "aws_xray_sdk" in self.provider.__module__

    def ignore_endpoint(self, hostname: Optional[str] = None, urls: Optional[List[str]] = None):
        """If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being
        requested.

        > NOTE: If the provider is not xray, nothing will be added to ignore list

        Documentation
        --------------
        - https://github.com/aws/aws-xray-sdk-python#ignoring-httplib-requests

        Parameters
        ----------
        hostname : Optional, str
            The hostname is matched using the Python fnmatch library which does Unix glob style matching.
        urls: Optional, List[str]
            List of urls to ignore. Example `tracer.ignore_endpoint(urls=["/ignored-url"])`
        """
        if not self._is_xray_provider():
            return

        from aws_xray_sdk.ext.httplib import add_ignored  # type: ignore

        add_ignored(hostname=hostname, urls=urls)

Methods

def capture_lambda_handler(self, lambda_handler: Union[Callable[[Dict[~KT, ~VT], Any], Any], Callable[[Dict[~KT, ~VT], Any, Optional[Dict[~KT, ~VT]]], Any], None] = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None)

Decorator to create subsegment for lambda handlers

As Lambda follows (event, context) signature we can remove some of the boilerplate and also capture any exception any Lambda function throws or its response as metadata

Parameters

lambda_handler : Callable
Method to annotate on
capture_response : bool, optional
Instructs tracer to not include handler's response as metadata
capture_error : bool, optional
Instructs tracer to not include handler's error as metadata, by default True

Example

Lambda function using capture_lambda_handler decorator

tracer = Tracer(service="payment")
@tracer.capture_lambda_handler
def handler(event, context):
    ...

Preventing Tracer to log response as metadata

tracer = Tracer(service="payment")
@tracer.capture_lambda_handler(capture_response=False)
def handler(event, context):
    ...

Raises

err
Exception raised by method
Expand source code
def capture_lambda_handler(
    self,
    lambda_handler: Union[Callable[[Dict, Any], Any], Optional[Callable[[Dict, Any, Optional[Dict]], Any]]] = None,
    capture_response: Optional[bool] = None,
    capture_error: Optional[bool] = None,
):
    """Decorator to create subsegment for lambda handlers

    As Lambda follows (event, context) signature we can remove some of the boilerplate
    and also capture any exception any Lambda function throws or its response as metadata

    Parameters
    ----------
    lambda_handler : Callable
        Method to annotate on
    capture_response : bool, optional
        Instructs tracer to not include handler's response as metadata
    capture_error : bool, optional
        Instructs tracer to not include handler's error as metadata, by default True

    Example
    -------
    **Lambda function using capture_lambda_handler decorator**

        tracer = Tracer(service="payment")
        @tracer.capture_lambda_handler
        def handler(event, context):
            ...

    **Preventing Tracer to log response as metadata**

        tracer = Tracer(service="payment")
        @tracer.capture_lambda_handler(capture_response=False)
        def handler(event, context):
            ...

    Raises
    ------
    err
        Exception raised by method
    """
    # If handler is None we've been called with parameters
    # Return a partial function with args filled
    if lambda_handler is None:
        logger.debug("Decorator called with parameters")
        return functools.partial(
            self.capture_lambda_handler, capture_response=capture_response, capture_error=capture_error
        )

    lambda_handler_name = lambda_handler.__name__
    capture_response = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"), choice=capture_response
    )
    capture_error = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"), choice=capture_error
    )

    @functools.wraps(lambda_handler)
    def decorate(event, context, **kwargs):
        with self.provider.in_subsegment(name=f"## {lambda_handler_name}") as subsegment:
            try:
                logger.debug("Calling lambda handler")
                response = lambda_handler(event, context, **kwargs)
                logger.debug("Received lambda handler response successfully")
                self._add_response_as_metadata(
                    method_name=lambda_handler_name,
                    data=response,
                    subsegment=subsegment,
                    capture_response=capture_response,
                )
            except Exception as err:
                logger.exception(f"Exception received from {lambda_handler_name}")
                self._add_full_exception_as_metadata(
                    method_name=lambda_handler_name, error=err, subsegment=subsegment, capture_error=capture_error
                )

                raise
            finally:
                global is_cold_start
                logger.debug("Annotating cold start")
                subsegment.put_annotation(key="ColdStart", value=is_cold_start)

                if is_cold_start:
                    is_cold_start = False

                if self.service:
                    subsegment.put_annotation(key="Service", value=self.service)

            return response

    return decorate
def capture_method(self, method: Optional[~AnyCallableT] = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None) ‑> ~AnyCallableT

Decorator to create subsegment for arbitrary functions

It also captures both response and exceptions as metadata and creates a subsegment named ## <method_module.method_qualifiedname>

see here: Qualified name for classes and functions

When running async functions concurrently, methods may impact each others subsegment, and can trigger and AlreadyEndedException from X-Ray due to async nature.

For this use case, either use capture_method only where async.gather is called, or use in_subsegment_async context manager via our escape hatch mechanism - See examples.

Parameters

method : Callable
Method to annotate on
capture_response : bool, optional
Instructs tracer to not include method's response as metadata
capture_error : bool, optional
Instructs tracer to not include handler's error as metadata, by default True

Example

Custom function using capture_method decorator

tracer = Tracer(service="payment")
@tracer.capture_method
def some_function()

Custom async method using capture_method decorator

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
async def confirm_booking(booking_id: str) -> Dict:
    resp = call_to_booking_service()

    tracer.put_annotation("BookingConfirmation", resp["requestId"])
    tracer.put_metadata("Booking confirmation", resp)

    return resp

def lambda_handler(event: dict, context: Any) -> Dict:
    booking_id = event.get("booking_id")
    asyncio.run(confirm_booking(booking_id=booking_id))

Custom generator function using capture_method decorator

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
def bookings_generator(booking_id):
    resp = call_to_booking_service()
    yield resp[0]
    yield resp[1]

def lambda_handler(event: dict, context: Any) -> Dict:
    gen = bookings_generator(booking_id=booking_id)
    result = list(gen)

Custom generator context manager using capture_method decorator

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
@contextlib.contextmanager
def booking_actions(booking_id):
    resp = call_to_booking_service()
    yield "example result"
    cleanup_stuff()

def lambda_handler(event: dict, context: Any) -> Dict:
    booking_id = event.get("booking_id")

    with booking_actions(booking_id=booking_id) as booking:
        result = booking

Tracing nested async calls

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
async def get_identity():
    ...

@tracer.capture_method
async def long_async_call():
    ...

@tracer.capture_method
async def async_tasks():
    await get_identity()
    ret = await long_async_call()

    return { "task": "done", **ret }

Safely tracing concurrent async calls with decorator

This may not needed once this bug is closed

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

async def get_identity():
    async with aioboto3.client("sts") as sts:
        account = await sts.get_caller_identity()
        return account

async def long_async_call():
    ...

@tracer.capture_method
async def async_tasks():
    _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

    return { "task": "done", **ret }

Safely tracing each concurrent async calls with escape hatch

This may not needed once this bug is closed

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

async def get_identity():
    async tracer.provider.in_subsegment_async("## get_identity"):
        ...

async def long_async_call():
    async tracer.provider.in_subsegment_async("## long_async_call"):
        ...

@tracer.capture_method
async def async_tasks():
    _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

    return { "task": "done", **ret }

Raises

err
Exception raised by method
Expand source code
def capture_method(
    self,
    method: Optional[AnyCallableT] = None,
    capture_response: Optional[bool] = None,
    capture_error: Optional[bool] = None,
) -> AnyCallableT:
    """Decorator to create subsegment for arbitrary functions

    It also captures both response and exceptions as metadata
    and creates a subsegment named `## <method_module.method_qualifiedname>`
    # see here: [Qualified name for classes and functions](https://peps.python.org/pep-3155/)

    When running [async functions concurrently](https://docs.python.org/3/library/asyncio-task.html#id6),
    methods may impact each others subsegment, and can trigger
    and AlreadyEndedException from X-Ray due to async nature.

    For this use case, either use `capture_method` only where
    `async.gather` is called, or use `in_subsegment_async`
    context manager via our escape hatch mechanism - See examples.

    Parameters
    ----------
    method : Callable
        Method to annotate on
    capture_response : bool, optional
        Instructs tracer to not include method's response as metadata
    capture_error : bool, optional
        Instructs tracer to not include handler's error as metadata, by default True

    Example
    -------
    **Custom function using capture_method decorator**

        tracer = Tracer(service="payment")
        @tracer.capture_method
        def some_function()

    **Custom async method using capture_method decorator**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        async def confirm_booking(booking_id: str) -> Dict:
            resp = call_to_booking_service()

            tracer.put_annotation("BookingConfirmation", resp["requestId"])
            tracer.put_metadata("Booking confirmation", resp)

            return resp

        def lambda_handler(event: dict, context: Any) -> Dict:
            booking_id = event.get("booking_id")
            asyncio.run(confirm_booking(booking_id=booking_id))

    **Custom generator function using capture_method decorator**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        def bookings_generator(booking_id):
            resp = call_to_booking_service()
            yield resp[0]
            yield resp[1]

        def lambda_handler(event: dict, context: Any) -> Dict:
            gen = bookings_generator(booking_id=booking_id)
            result = list(gen)

    **Custom generator context manager using capture_method decorator**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        @contextlib.contextmanager
        def booking_actions(booking_id):
            resp = call_to_booking_service()
            yield "example result"
            cleanup_stuff()

        def lambda_handler(event: dict, context: Any) -> Dict:
            booking_id = event.get("booking_id")

            with booking_actions(booking_id=booking_id) as booking:
                result = booking

    **Tracing nested async calls**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        async def get_identity():
            ...

        @tracer.capture_method
        async def long_async_call():
            ...

        @tracer.capture_method
        async def async_tasks():
            await get_identity()
            ret = await long_async_call()

            return { "task": "done", **ret }

    **Safely tracing concurrent async calls with decorator**

    This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164)

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        async def get_identity():
            async with aioboto3.client("sts") as sts:
                account = await sts.get_caller_identity()
                return account

        async def long_async_call():
            ...

        @tracer.capture_method
        async def async_tasks():
            _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

            return { "task": "done", **ret }

    **Safely tracing each concurrent async calls with escape hatch**

    This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164)

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        async def get_identity():
            async tracer.provider.in_subsegment_async("## get_identity"):
                ...

        async def long_async_call():
            async tracer.provider.in_subsegment_async("## long_async_call"):
                ...

        @tracer.capture_method
        async def async_tasks():
            _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

            return { "task": "done", **ret }

    Raises
    ------
    err
        Exception raised by method
    """
    # If method is None we've been called with parameters
    # Return a partial function with args filled
    if method is None:
        logger.debug("Decorator called with parameters")
        return cast(
            AnyCallableT,
            functools.partial(self.capture_method, capture_response=capture_response, capture_error=capture_error),
        )

    # Example: app.ClassA.get_all  # noqa E800
    method_name = f"{method.__module__}.{method.__qualname__}"

    capture_response = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"), choice=capture_response
    )
    capture_error = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"), choice=capture_error
    )

    # Maintenance: Need a factory/builder here to simplify this now
    if inspect.iscoroutinefunction(method):
        return self._decorate_async_function(
            method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name
        )
    elif inspect.isgeneratorfunction(method):
        return self._decorate_generator_function(
            method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name
        )
    elif hasattr(method, "__wrapped__") and inspect.isgeneratorfunction(method.__wrapped__):  # type: ignore
        return self._decorate_generator_function_with_context_manager(
            method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name
        )
    else:
        return self._decorate_sync_function(
            method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name
        )
def ignore_endpoint(self, hostname: Optional[str] = None, urls: Optional[List[str]] = None)

If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being requested.

NOTE: If the provider is not xray, nothing will be added to ignore list

Documentation

Parameters

hostname : Optional, str
The hostname is matched using the Python fnmatch library which does Unix glob style matching.
urls : Optional, List[str]
List of urls to ignore. Example tracer.ignore_endpoint(urls=["/ignored-url"])
Expand source code
def ignore_endpoint(self, hostname: Optional[str] = None, urls: Optional[List[str]] = None):
    """If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being
    requested.

    > NOTE: If the provider is not xray, nothing will be added to ignore list

    Documentation
    --------------
    - https://github.com/aws/aws-xray-sdk-python#ignoring-httplib-requests

    Parameters
    ----------
    hostname : Optional, str
        The hostname is matched using the Python fnmatch library which does Unix glob style matching.
    urls: Optional, List[str]
        List of urls to ignore. Example `tracer.ignore_endpoint(urls=["/ignored-url"])`
    """
    if not self._is_xray_provider():
        return

    from aws_xray_sdk.ext.httplib import add_ignored  # type: ignore

    add_ignored(hostname=hostname, urls=urls)
def patch(self, modules: Optional[Sequence[str]] = None)

Patch modules for instrumentation.

Patches all supported modules by default if none are given.

Parameters

modules : Optional[Sequence[str]]
List of modules to be patched, optional by default
Expand source code
def patch(self, modules: Optional[Sequence[str]] = None):
    """Patch modules for instrumentation.

    Patches all supported modules by default if none are given.

    Parameters
    ----------
    modules : Optional[Sequence[str]]
        List of modules to be patched, optional by default
    """
    if self.disabled:
        logger.debug("Tracing has been disabled, aborting patch")
        return

    if modules is None:
        self.provider.patch_all()
    else:
        self.provider.patch(modules)
def put_annotation(self, key: str, value: Union[str, numbers.Number, bool])

Adds annotation to existing segment or subsegment

Parameters

key : str
Annotation key
value : Union[str, numbers.Number, bool]
Value for annotation

Example

Custom annotation for a pseudo service named payment

tracer = Tracer(service="payment")
tracer.put_annotation("PaymentStatus", "CONFIRMED")
Expand source code
def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]):
    """Adds annotation to existing segment or subsegment

    Parameters
    ----------
    key : str
        Annotation key
    value : Union[str, numbers.Number, bool]
        Value for annotation

    Example
    -------
    Custom annotation for a pseudo service named payment

        tracer = Tracer(service="payment")
        tracer.put_annotation("PaymentStatus", "CONFIRMED")
    """
    if self.disabled:
        logger.debug("Tracing has been disabled, aborting put_annotation")
        return

    logger.debug(f"Annotating on key '{key}' with '{value}'")
    self.provider.put_annotation(key=key, value=value)
def put_metadata(self, key: str, value: Any, namespace: Optional[str] = None)

Adds metadata to existing segment or subsegment

Parameters

key : str
Metadata key
value : any
Value for metadata
namespace : str, optional
Namespace that metadata will lie under, by default None

Example

Custom metadata for a pseudo service named payment

tracer = Tracer(service="payment")
response = collect_payment()
tracer.put_metadata("Payment collection", response)
Expand source code
def put_metadata(self, key: str, value: Any, namespace: Optional[str] = None):
    """Adds metadata to existing segment or subsegment

    Parameters
    ----------
    key : str
        Metadata key
    value : any
        Value for metadata
    namespace : str, optional
        Namespace that metadata will lie under, by default None

    Example
    -------
    Custom metadata for a pseudo service named payment

        tracer = Tracer(service="payment")
        response = collect_payment()
        tracer.put_metadata("Payment collection", response)
    """
    if self.disabled:
        logger.debug("Tracing has been disabled, aborting put_metadata")
        return

    namespace = namespace or self.service
    logger.debug(f"Adding metadata on key '{key}' with '{value}' at namespace '{namespace}'")
    self.provider.put_metadata(key=key, value=value, namespace=namespace)