Package aws_lambda_powertools

Top-level package for Lambda Python Powertools.

Expand source code
# -*- coding: utf-8 -*-

"""Top-level package for Lambda Python Powertools."""

from pathlib import Path

from aws_lambda_powertools.logging import Logger
from aws_lambda_powertools.metrics import Metrics, single_metric
from aws_lambda_powertools.package_logger import set_package_logger_handler
from aws_lambda_powertools.shared.user_agent import inject_user_agent
from aws_lambda_powertools.shared.version import VERSION
from aws_lambda_powertools.tracing import Tracer

__version__ = VERSION
__author__ = """Amazon Web Services"""
__all__ = [
    "Logger",
    "Metrics",
    "single_metric",
    "Tracer",
]

PACKAGE_PATH = Path(__file__).parent

set_package_logger_handler()

inject_user_agent()

Sub-modules

aws_lambda_powertools.event_handler

Event handler decorators for common Lambda events

aws_lambda_powertools.exceptions

Shared exceptions that don't belong to a single utility

aws_lambda_powertools.logging

Logging utility

aws_lambda_powertools.metrics

CloudWatch Embedded Metric Format utility

aws_lambda_powertools.middleware_factory

Utilities to enhance middlewares

aws_lambda_powertools.package_logger
aws_lambda_powertools.shared

Internal shared functions. Do not rely on it besides internal usage.

aws_lambda_powertools.tracing

Tracing utility

aws_lambda_powertools.utilities

General utilities for Powertools

Functions

def single_metric(name: str, unit: MetricUnit, value: float, resolution: MetricResolution | int = 60, namespace: str | None = None, default_dimensions: Dict[str, str] | None = None) ‑> Generator[SingleMetric, None, None]

Context manager to simplify creation of a single metric

Example

Creates cold start metric with function_version as dimension

from aws_lambda_powertools import single_metric
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.metrics import MetricResolution

with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, resolution=MetricResolution.Standard, namespace="ServerlessAirline") as metric:
    metric.add_dimension(name="function_version", value="47")

Same as above but set namespace using environment variable

$ export POWERTOOLS_METRICS_NAMESPACE="ServerlessAirline"

from aws_lambda_powertools import single_metric
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.metrics import MetricResolution

with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, resolution=MetricResolution.Standard) as metric:
    metric.add_dimension(name="function_version", value="47")

Parameters

name : str
Metric name
unit : MetricUnit
aws_lambda_powertools.helper.models.MetricUnit
resolution : MetricResolution
aws_lambda_powertools.helper.models.MetricResolution
value : float
Metric value
namespace : str
Namespace for metrics
default_dimensions : Dict[str, str], optional
Metric dimensions as key=value that will always be present

Yields

SingleMetric
SingleMetric class instance

Raises

MetricUnitError
When metric metric isn't supported by CloudWatch
MetricResolutionError
When metric resolution isn't supported by CloudWatch
MetricValueError
When metric value isn't a number
SchemaValidationError
When metric object fails EMF schema validation
Expand source code
@contextmanager
def single_metric(
    name: str,
    unit: MetricUnit,
    value: float,
    resolution: MetricResolution | int = 60,
    namespace: str | None = None,
    default_dimensions: Dict[str, str] | None = None,
) -> Generator[SingleMetric, None, None]:
    """Context manager to simplify creation of a single metric

    Example
    -------
    **Creates cold start metric with function_version as dimension**

        from aws_lambda_powertools import single_metric
        from aws_lambda_powertools.metrics import MetricUnit
        from aws_lambda_powertools.metrics import MetricResolution

        with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, resolution=MetricResolution.Standard, namespace="ServerlessAirline") as metric:
            metric.add_dimension(name="function_version", value="47")

    **Same as above but set namespace using environment variable**

        $ export POWERTOOLS_METRICS_NAMESPACE="ServerlessAirline"

        from aws_lambda_powertools import single_metric
        from aws_lambda_powertools.metrics import MetricUnit
        from aws_lambda_powertools.metrics import MetricResolution

        with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, resolution=MetricResolution.Standard) as metric:
            metric.add_dimension(name="function_version", value="47")

    Parameters
    ----------
    name : str
        Metric name
    unit : MetricUnit
        `aws_lambda_powertools.helper.models.MetricUnit`
    resolution : MetricResolution
        `aws_lambda_powertools.helper.models.MetricResolution`
    value : float
        Metric value
    namespace: str
        Namespace for metrics
    default_dimensions: Dict[str, str], optional
        Metric dimensions as key=value that will always be present


    Yields
    -------
    SingleMetric
        SingleMetric class instance

    Raises
    ------
    MetricUnitError
        When metric metric isn't supported by CloudWatch
    MetricResolutionError
        When metric resolution isn't supported by CloudWatch
    MetricValueError
        When metric value isn't a number
    SchemaValidationError
        When metric object fails EMF schema validation
    """  # noqa: E501
    metric_set: Dict | None = None
    try:
        metric: SingleMetric = SingleMetric(namespace=namespace)
        metric.add_metric(name=name, unit=unit, value=value, resolution=resolution)

        if default_dimensions:
            for dim_name, dim_value in default_dimensions.items():
                metric.add_dimension(name=dim_name, value=dim_value)

        yield metric
        metric_set = metric.serialize_metric_set()
    finally:
        print(json.dumps(metric_set, separators=(",", ":")))

Classes

class Logger (service: Optional[str] = None, level: Union[str, int, None] = None, child: bool = False, sampling_rate: Optional[float] = None, stream: Optional[IO[str]] = None, logger_formatter: Optional[PowertoolsFormatter] = None, logger_handler: Optional[logging.Handler] = None, log_uncaught_exceptions: bool = False, json_serializer: Optional[Callable[[Dict], str]] = None, json_deserializer: Optional[Callable[[Union[Dict, str, bool, int, float]], str]] = None, json_default: Optional[Callable[[Any], Any]] = None, datefmt: Optional[str] = None, use_datetime_directive: bool = False, log_record_order: Optional[List[str]] = None, utc: bool = False, use_rfc3339: bool = False, serialize_stacktrace: bool = True, **kwargs)

Creates and setups a logger to format statements in JSON.

Includes service name and any additional key=value into logs It also accepts both service name or level explicitly via env vars

Environment Variables

POWERTOOLS_SERVICE_NAME : str service name POWERTOOLS_LOG_LEVEL: str logging level (e.g. INFO, DEBUG) POWERTOOLS_LOGGER_SAMPLE_RATE: float sampling rate ranging from 0 to 1, 1 being 100% sampling

Parameters

service : str, optional
service name to be appended in logs, by default "service_undefined"
level : str, int optional
The level to set. Can be a string representing the level name: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' or an integer representing the level value: 10 for 'DEBUG', 20 for 'INFO', 30 for 'WARNING', 40 for 'ERROR', 50 for 'CRITICAL'. by default "INFO"
child : bool, optional
create a child Logger named ., False by default
sample_rate : float, optional
sample rate for debug calls within execution context defaults to 0.0
stream : sys.stdout, optional
valid output for a logging stream, by default sys.stdout
logger_formatter : PowertoolsFormatter, optional
custom logging formatter that implements PowertoolsFormatter
logger_handler : logging.Handler, optional
custom logging handler e.g. logging.FileHandler("file.log")
log_uncaught_exceptions : bool, by default False

logs uncaught exception using sys.excepthook

See: https://docs.python.org/3/library/sys.html#sys.excepthook

Parameters Propagated To Lambdapowertoolsformatter

datefmt: str, optional String directives (strftime) to format log timestamp using time, by default it uses 2021-05-03 11:47:12,494+0200. use_datetime_directive: bool, optional Interpret datefmt as a format string for datetime.datetime.strftime, rather than time.strftime.

See <https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior> . This
also supports a custom %F directive for milliseconds.

use_rfc3339: bool, optional Whether to use a popular date format that complies with both RFC3339 and ISO8601. e.g., 2022-10-27T16:27:43.738+02:00. json_serializer : Callable, optional function to serialize obj to a JSON formatted str, by default json.dumps json_deserializer : Callable, optional function to deserialize str, bytes, bytearray containing a JSON document to a Python obj`, by default json.loads json_default : Callable, optional function to coerce unserializable values, by default str()

Only used when no custom formatter is set

utc : bool, optional set logging timestamp to UTC, by default False to continue to use local time as per stdlib log_record_order : list, optional set order of log keys when logging, by default ["level", "location", "message", "timestamp"]

Example

Setups structured logging in JSON for Lambda functions with explicit service name

>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment")
>>>
>>> def handler(event, context):
        logger.info("Hello")

Setups structured logging in JSON for Lambda functions using env vars

$ export POWERTOOLS_SERVICE_NAME="payment"
$ export POWERTOOLS_LOGGER_SAMPLE_RATE=0.01 # 1% debug sampling
>>> from aws_lambda_powertools import Logger
>>> logger = Logger()
>>>
>>> def handler(event, context):
        logger.info("Hello")

Append payment_id to previously setup logger

>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment")
>>>
>>> def handler(event, context):
        logger.append_keys(payment_id=event["payment_id"])
        logger.info("Hello")

Create child Logger using logging inheritance via child param

>>> # app.py
>>> import another_file
>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment")
>>>
>>> # another_file.py
>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment", child=True)

Logging in UTC timezone

>>> # app.py
>>> import logging
>>> from aws_lambda_powertools import Logger
>>>
>>> logger = Logger(service="payment", utc=True)

Brings message as the first key in log statements

>>> # app.py
>>> import logging
>>> from aws_lambda_powertools import Logger
>>>
>>> logger = Logger(service="payment", log_record_order=["message"])

Logging to a file instead of standard output for testing

>>> # app.py
>>> import logging
>>> from aws_lambda_powertools import Logger
>>>
>>> logger = Logger(service="payment", logger_handler=logging.FileHandler("log.json"))

Raises

InvalidLoggerSamplingRateError
When sampling rate provided is not a float
Expand source code
class Logger:
    """Creates and setups a logger to format statements in JSON.

    Includes service name and any additional key=value into logs
    It also accepts both service name or level explicitly via env vars

    Environment variables
    ---------------------
    POWERTOOLS_SERVICE_NAME : str
        service name
    POWERTOOLS_LOG_LEVEL: str
        logging level (e.g. INFO, DEBUG)
    POWERTOOLS_LOGGER_SAMPLE_RATE: float
        sampling rate ranging from 0 to 1, 1 being 100% sampling

    Parameters
    ----------
    service : str, optional
        service name to be appended in logs, by default "service_undefined"
    level : str, int optional
        The level to set. Can be a string representing the level name: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
        or an integer representing the level value: 10 for 'DEBUG', 20 for 'INFO', 30 for 'WARNING', 40 for 'ERROR', 50 for 'CRITICAL'.
        by default "INFO"
    child: bool, optional
        create a child Logger named <service>.<caller_file_name>, False by default
    sample_rate: float, optional
        sample rate for debug calls within execution context defaults to 0.0
    stream: sys.stdout, optional
        valid output for a logging stream, by default sys.stdout
    logger_formatter: PowertoolsFormatter, optional
        custom logging formatter that implements PowertoolsFormatter
    logger_handler: logging.Handler, optional
        custom logging handler e.g. logging.FileHandler("file.log")
    log_uncaught_exceptions: bool, by default False
        logs uncaught exception using sys.excepthook

        See: https://docs.python.org/3/library/sys.html#sys.excepthook


    Parameters propagated to LambdaPowertoolsFormatter
    --------------------------------------------------
    datefmt: str, optional
        String directives (strftime) to format log timestamp using `time`, by default it uses 2021-05-03 11:47:12,494+0200.
    use_datetime_directive: bool, optional
        Interpret `datefmt` as a format string for `datetime.datetime.strftime`, rather than
        `time.strftime`.

        See https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior . This
        also supports a custom %F directive for milliseconds.
    use_rfc3339: bool, optional
        Whether to use a popular date format that complies with both RFC3339 and ISO8601.
        e.g., 2022-10-27T16:27:43.738+02:00.
    json_serializer : Callable, optional
        function to serialize `obj` to a JSON formatted `str`, by default json.dumps
    json_deserializer : Callable, optional
        function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
        by default json.loads
    json_default : Callable, optional
        function to coerce unserializable values, by default `str()`

        Only used when no custom formatter is set
    utc : bool, optional
        set logging timestamp to UTC, by default False to continue to use local time as per stdlib
    log_record_order : list, optional
        set order of log keys when logging, by default ["level", "location", "message", "timestamp"]

    Example
    -------
    **Setups structured logging in JSON for Lambda functions with explicit service name**

        >>> from aws_lambda_powertools import Logger
        >>> logger = Logger(service="payment")
        >>>
        >>> def handler(event, context):
                logger.info("Hello")

    **Setups structured logging in JSON for Lambda functions using env vars**

        $ export POWERTOOLS_SERVICE_NAME="payment"
        $ export POWERTOOLS_LOGGER_SAMPLE_RATE=0.01 # 1% debug sampling
        >>> from aws_lambda_powertools import Logger
        >>> logger = Logger()
        >>>
        >>> def handler(event, context):
                logger.info("Hello")

    **Append payment_id to previously setup logger**

        >>> from aws_lambda_powertools import Logger
        >>> logger = Logger(service="payment")
        >>>
        >>> def handler(event, context):
                logger.append_keys(payment_id=event["payment_id"])
                logger.info("Hello")

    **Create child Logger using logging inheritance via child param**

        >>> # app.py
        >>> import another_file
        >>> from aws_lambda_powertools import Logger
        >>> logger = Logger(service="payment")
        >>>
        >>> # another_file.py
        >>> from aws_lambda_powertools import Logger
        >>> logger = Logger(service="payment", child=True)

    **Logging in UTC timezone**

        >>> # app.py
        >>> import logging
        >>> from aws_lambda_powertools import Logger
        >>>
        >>> logger = Logger(service="payment", utc=True)

    **Brings message as the first key in log statements**

        >>> # app.py
        >>> import logging
        >>> from aws_lambda_powertools import Logger
        >>>
        >>> logger = Logger(service="payment", log_record_order=["message"])

    **Logging to a file instead of standard output for testing**

        >>> # app.py
        >>> import logging
        >>> from aws_lambda_powertools import Logger
        >>>
        >>> logger = Logger(service="payment", logger_handler=logging.FileHandler("log.json"))

    Raises
    ------
    InvalidLoggerSamplingRateError
        When sampling rate provided is not a float
    """  # noqa: E501

    def __init__(
        self,
        service: Optional[str] = None,
        level: Union[str, int, None] = None,
        child: bool = False,
        sampling_rate: Optional[float] = None,
        stream: Optional[IO[str]] = None,
        logger_formatter: Optional[PowertoolsFormatter] = None,
        logger_handler: Optional[logging.Handler] = None,
        log_uncaught_exceptions: bool = False,
        json_serializer: Optional[Callable[[Dict], str]] = None,
        json_deserializer: Optional[Callable[[Union[Dict, str, bool, int, float]], str]] = None,
        json_default: Optional[Callable[[Any], Any]] = None,
        datefmt: Optional[str] = None,
        use_datetime_directive: bool = False,
        log_record_order: Optional[List[str]] = None,
        utc: bool = False,
        use_rfc3339: bool = False,
        serialize_stacktrace: bool = True,
        **kwargs,
    ) -> None:
        self.service = resolve_env_var_choice(
            choice=service,
            env=os.getenv(constants.SERVICE_NAME_ENV, "service_undefined"),
        )
        self.sampling_rate = resolve_env_var_choice(
            choice=sampling_rate,
            env=os.getenv(constants.LOGGER_LOG_SAMPLING_RATE),
        )
        self.child = child
        self.logger_formatter = logger_formatter
        self._stream = stream or sys.stdout
        self.logger_handler = logger_handler or logging.StreamHandler(self._stream)
        self.log_uncaught_exceptions = log_uncaught_exceptions

        self._is_deduplication_disabled = resolve_truthy_env_var_choice(
            env=os.getenv(constants.LOGGER_LOG_DEDUPLICATION_ENV, "false"),
        )
        self._default_log_keys = {"service": self.service, "sampling_rate": self.sampling_rate}
        self._logger = self._get_logger()

        # NOTE: This is primarily to improve UX, so IDEs can autocomplete LambdaPowertoolsFormatter options
        # previously, we masked all of them as kwargs thus limiting feature discovery
        formatter_options = {
            "json_serializer": json_serializer,
            "json_deserializer": json_deserializer,
            "json_default": json_default,
            "datefmt": datefmt,
            "use_datetime_directive": use_datetime_directive,
            "log_record_order": log_record_order,
            "utc": utc,
            "use_rfc3339": use_rfc3339,
            "serialize_stacktrace": serialize_stacktrace,
        }

        self._init_logger(formatter_options=formatter_options, log_level=level, **kwargs)

        if self.log_uncaught_exceptions:
            logger.debug("Replacing exception hook")
            sys.excepthook = functools.partial(log_uncaught_exception_hook, logger=self)

    # Prevent __getattr__ from shielding unknown attribute errors in type checkers
    # https://github.com/aws-powertools/powertools-lambda-python/issues/1660
    if not TYPE_CHECKING:

        def __getattr__(self, name):
            # Proxy attributes not found to actual logger to support backward compatibility
            # https://github.com/aws-powertools/powertools-lambda-python/issues/97
            return getattr(self._logger, name)

    def _get_logger(self) -> logging.Logger:
        """Returns a Logger named {self.service}, or {self.service.filename} for child loggers"""
        logger_name = self.service
        if self.child:
            logger_name = f"{self.service}.{_get_caller_filename()}"

        return logging.getLogger(logger_name)

    def _init_logger(
        self,
        formatter_options: Optional[Dict] = None,
        log_level: Union[str, int, None] = None,
        **kwargs,
    ) -> None:
        """Configures new logger"""

        # Skip configuration if it's a child logger or a pre-configured logger
        # to prevent the following:
        #   a) multiple handlers being attached
        #   b) different sampling mechanisms
        #   c) multiple messages from being logged as handlers can be duplicated
        is_logger_preconfigured = getattr(self._logger, "init", False)
        if self.child or is_logger_preconfigured:
            return

        self.setLevel(log_level)
        self._configure_sampling()
        self.addHandler(self.logger_handler)
        self.structure_logs(formatter_options=formatter_options, **kwargs)

        # Pytest Live Log feature duplicates log records for colored output
        # but we explicitly add a filter for log deduplication.
        # This flag disables this protection when you explicit want logs to be duplicated (#262)
        if not self._is_deduplication_disabled:
            logger.debug("Adding filter in root logger to suppress child logger records to bubble up")
            for handler in logging.root.handlers:
                # It'll add a filter to suppress any child logger from self.service
                # Example: `Logger(service="order")`, where service is Order
                # It'll reject all loggers starting with `order` e.g. order.checkout, order.shared
                handler.addFilter(SuppressFilter(self.service))

        # as per bug in #249, we should not be pre-configuring an existing logger
        # therefore we set a custom attribute in the Logger that will be returned
        # std logging will return the same Logger with our attribute if name is reused
        logger.debug(f"Marking logger {self.service} as preconfigured")
        self._logger.init = True  # type: ignore[attr-defined]

    def _configure_sampling(self) -> None:
        """Dynamically set log level based on sampling rate

        Raises
        ------
        InvalidLoggerSamplingRateError
            When sampling rate provided is not a float
        """
        try:
            if self.sampling_rate and random.random() <= float(self.sampling_rate):
                logger.debug("Setting log level to Debug due to sampling rate")
                self._logger.setLevel(logging.DEBUG)
        except ValueError:
            raise InvalidLoggerSamplingRateError(
                (
                    f"Expected a float value ranging 0 to 1, but received {self.sampling_rate} instead."
                    "Please review POWERTOOLS_LOGGER_SAMPLE_RATE environment variable."
                ),
            )

    @overload
    def inject_lambda_context(
        self,
        lambda_handler: AnyCallableT,
        log_event: Optional[bool] = None,
        correlation_id_path: Optional[str] = None,
        clear_state: Optional[bool] = False,
    ) -> AnyCallableT: ...

    @overload
    def inject_lambda_context(
        self,
        lambda_handler: None = None,
        log_event: Optional[bool] = None,
        correlation_id_path: Optional[str] = None,
        clear_state: Optional[bool] = False,
    ) -> Callable[[AnyCallableT], AnyCallableT]: ...

    def inject_lambda_context(
        self,
        lambda_handler: Optional[AnyCallableT] = None,
        log_event: Optional[bool] = None,
        correlation_id_path: Optional[str] = None,
        clear_state: Optional[bool] = False,
    ) -> Any:
        """Decorator to capture Lambda contextual info and inject into logger

        Parameters
        ----------
        clear_state : bool, optional
            Instructs logger to remove any custom keys previously added
        lambda_handler : Callable
            Method to inject the lambda context
        log_event : bool, optional
            Instructs logger to log Lambda Event, by default False
        correlation_id_path: str, optional
            Optional JMESPath for the correlation_id

        Environment variables
        ---------------------
        POWERTOOLS_LOGGER_LOG_EVENT : str
            instruct logger to log Lambda Event (e.g. `"true", "True", "TRUE"`)

        Example
        -------
        **Captures Lambda contextual runtime info (e.g memory, arn, req_id)**

            from aws_lambda_powertools import Logger

            logger = Logger(service="payment")

            @logger.inject_lambda_context
            def handler(event, context):
                logger.info("Hello")

        **Captures Lambda contextual runtime info and logs incoming request**

            from aws_lambda_powertools import Logger

            logger = Logger(service="payment")

            @logger.inject_lambda_context(log_event=True)
            def handler(event, context):
                logger.info("Hello")

        Returns
        -------
        decorate : Callable
            Decorated lambda handler
        """

        # If handler is None we've been called with parameters
        # Return a partial function with args filled
        if lambda_handler is None:
            logger.debug("Decorator called with parameters")
            return functools.partial(
                self.inject_lambda_context,
                log_event=log_event,
                correlation_id_path=correlation_id_path,
                clear_state=clear_state,
            )

        log_event = resolve_truthy_env_var_choice(
            env=os.getenv(constants.LOGGER_LOG_EVENT_ENV, "false"),
            choice=log_event,
        )

        @functools.wraps(lambda_handler)
        def decorate(event, context, *args, **kwargs):
            lambda_context = build_lambda_context_model(context)
            cold_start = _is_cold_start()

            if clear_state:
                self.structure_logs(cold_start=cold_start, **lambda_context.__dict__)
            else:
                self.append_keys(cold_start=cold_start, **lambda_context.__dict__)

            if correlation_id_path:
                self.set_correlation_id(
                    jmespath_utils.extract_data_from_envelope(envelope=correlation_id_path, data=event),
                )

            if log_event:
                logger.debug("Event received")
                self.info(extract_event_from_common_models(event))

            return lambda_handler(event, context, *args, **kwargs)

        return decorate

    def info(
        self,
        msg: object,
        *args: object,
        exc_info: logging._ExcInfoType = None,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs: object,
    ) -> None:
        extra = extra or {}
        extra = {**extra, **kwargs}

        return self._logger.info(
            msg,
            *args,
            exc_info=exc_info,
            stack_info=stack_info,
            stacklevel=stacklevel,
            extra=extra,
        )

    def error(
        self,
        msg: object,
        *args: object,
        exc_info: logging._ExcInfoType = None,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs: object,
    ) -> None:
        extra = extra or {}
        extra = {**extra, **kwargs}

        return self._logger.error(
            msg,
            *args,
            exc_info=exc_info,
            stack_info=stack_info,
            stacklevel=stacklevel,
            extra=extra,
        )

    def exception(
        self,
        msg: object,
        *args: object,
        exc_info: logging._ExcInfoType = True,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs: object,
    ) -> None:
        extra = extra or {}
        extra = {**extra, **kwargs}

        return self._logger.exception(
            msg,
            *args,
            exc_info=exc_info,
            stack_info=stack_info,
            stacklevel=stacklevel,
            extra=extra,
        )

    def critical(
        self,
        msg: object,
        *args: object,
        exc_info: logging._ExcInfoType = None,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs: object,
    ) -> None:
        extra = extra or {}
        extra = {**extra, **kwargs}

        return self._logger.critical(
            msg,
            *args,
            exc_info=exc_info,
            stack_info=stack_info,
            stacklevel=stacklevel,
            extra=extra,
        )

    def warning(
        self,
        msg: object,
        *args: object,
        exc_info: logging._ExcInfoType = None,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs: object,
    ) -> None:
        extra = extra or {}
        extra = {**extra, **kwargs}

        return self._logger.warning(
            msg,
            *args,
            exc_info=exc_info,
            stack_info=stack_info,
            stacklevel=stacklevel,
            extra=extra,
        )

    def debug(
        self,
        msg: object,
        *args: object,
        exc_info: logging._ExcInfoType = None,
        stack_info: bool = False,
        stacklevel: int = 2,
        extra: Optional[Mapping[str, object]] = None,
        **kwargs: object,
    ) -> None:
        extra = extra or {}
        extra = {**extra, **kwargs}

        return self._logger.debug(
            msg,
            *args,
            exc_info=exc_info,
            stack_info=stack_info,
            stacklevel=stacklevel,
            extra=extra,
        )

    def append_keys(self, **additional_keys: object) -> None:
        self.registered_formatter.append_keys(**additional_keys)

    def get_current_keys(self) -> Dict[str, Any]:
        return self.registered_formatter.get_current_keys()

    def remove_keys(self, keys: Iterable[str]) -> None:
        self.registered_formatter.remove_keys(keys)

    def structure_logs(self, append: bool = False, formatter_options: Optional[Dict] = None, **keys) -> None:
        """Sets logging formatting to JSON.

        Optionally, it can append keyword arguments
        to an existing logger, so it is available across future log statements.

        Last keyword argument and value wins if duplicated.

        Parameters
        ----------
        append : bool, optional
            append keys provided to logger formatter, by default False
        formatter_options : dict, optional
            LambdaPowertoolsFormatter options to be propagated, by default {}
        """
        formatter_options = formatter_options or {}

        # There are 3 operational modes for this method
        ## 1. Register a Powertools for AWS Lambda (Python) Formatter for the first time
        ## 2. Append new keys to the current logger formatter; deprecated in favour of append_keys
        ## 3. Add new keys and discard existing to the registered formatter

        # Mode 1
        log_keys = {**self._default_log_keys, **keys}
        is_logger_preconfigured = getattr(self._logger, "init", False)
        if not is_logger_preconfigured:
            formatter = self.logger_formatter or LambdaPowertoolsFormatter(**formatter_options, **log_keys)
            self.registered_handler.setFormatter(formatter)

            # when using a custom Powertools for AWS Lambda (Python) Formatter
            # standard and custom keys that are not Powertools for AWS Lambda (Python) Formatter parameters
            # should be appended and custom keys that might happen to be Powertools for AWS Lambda (Python)
            # Formatter parameters should be discarded this prevents adding them as custom keys, for example,
            # `json_default=<callable>` see https://github.com/aws-powertools/powertools-lambda-python/issues/1263
            custom_keys = {k: v for k, v in log_keys.items() if k not in RESERVED_FORMATTER_CUSTOM_KEYS}
            return self.registered_formatter.append_keys(**custom_keys)

        # Mode 2 (legacy)
        if append:
            # Maintenance: Add deprecation warning for major version
            return self.append_keys(**keys)

        # Mode 3
        self.registered_formatter.clear_state()
        self.registered_formatter.append_keys(**log_keys)

    def set_correlation_id(self, value: Optional[str]) -> None:
        """Sets the correlation_id in the logging json

        Parameters
        ----------
        value : str, optional
            Value for the correlation id. None will remove the correlation_id
        """
        self.append_keys(correlation_id=value)

    def get_correlation_id(self) -> Optional[str]:
        """Gets the correlation_id in the logging json

        Returns
        -------
        str, optional
            Value for the correlation id
        """
        if isinstance(self.registered_formatter, LambdaPowertoolsFormatter):
            return self.registered_formatter.log_format.get("correlation_id")
        return None

    def setLevel(self, level: Union[str, int, None]) -> None:
        return self._logger.setLevel(self._determine_log_level(level))

    def addHandler(self, handler: logging.Handler) -> None:
        return self._logger.addHandler(handler)

    def addFilter(self, filter: logging._FilterType) -> None:  # noqa: A002 # filter built-in usage
        return self._logger.addFilter(filter)

    def removeFilter(self, filter: logging._FilterType) -> None:  # noqa: A002 # filter built-in usage
        return self._logger.removeFilter(filter)

    @property
    def registered_handler(self) -> logging.Handler:
        """Convenience property to access the first logger handler"""
        # We ignore mypy here because self.child encodes whether or not self._logger.parent is
        # None, mypy can't see this from context but we can
        handlers = self._logger.parent.handlers if self.child else self._logger.handlers  # type: ignore[union-attr]
        return handlers[0]

    @property
    def registered_formatter(self) -> BasePowertoolsFormatter:
        """Convenience property to access the first logger formatter"""
        return self.registered_handler.formatter  # type: ignore[return-value]

    @property
    def log_level(self) -> int:
        return self._logger.level

    @property
    def name(self) -> str:
        return self._logger.name

    @property
    def handlers(self) -> List[logging.Handler]:
        """List of registered logging handlers

        Notes
        -----

        Looking for the first configured handler? Use registered_handler property instead.
        """
        return self._logger.handlers

    def _get_aws_lambda_log_level(self) -> Optional[str]:
        """
        Retrieve the log level for AWS Lambda from the Advanced Logging Controls feature.
        Returns:
            Optional[str]: The corresponding logging level.
        """

        return constants.LAMBDA_ADVANCED_LOGGING_LEVELS.get(os.getenv(constants.LAMBDA_LOG_LEVEL_ENV))

    def _get_powertools_log_level(self, level: Union[str, int, None]) -> Optional[str]:
        """Retrieve the log level for Powertools from the environment variable or level parameter.
        If log level is an integer, we convert to its respective string level `logging.getLevelName()`.
        If no log level is provided, we check env vars for the log level: POWERTOOLS_LOG_LEVEL_ENV and POWERTOOLS_LOG_LEVEL_LEGACY_ENV.
        Parameters:
        -----------
        level : Union[str, int, None]
            The specified log level as a string, integer, or None.
        Environment variables
        ---------------------
        POWERTOOLS_LOG_LEVEL : str
            log level (e.g: INFO, DEBUG, WARNING, ERROR, CRITICAL)
        LOG_LEVEL (Legacy) : str
            log level (e.g: INFO, DEBUG, WARNING, ERROR, CRITICAL)
        Returns:
        --------
        Optional[str]:
            The corresponding logging level. Returns None if the log level is not explicitly specified.
        """  # noqa E501

        # Extract log level from Powertools Logger env vars
        log_level_env = os.getenv(constants.POWERTOOLS_LOG_LEVEL_ENV) or os.getenv(
            constants.POWERTOOLS_LOG_LEVEL_LEGACY_ENV,
        )
        # If level is an int (logging.INFO), return its respective string ("INFO")
        if isinstance(level, int):
            return logging.getLevelName(level)

        return level or log_level_env

    def _determine_log_level(self, level: Union[str, int, None]) -> Union[str, int]:
        """Determine the effective log level considering Lambda and Powertools preferences.
        It emits an UserWarning if Lambda ALC log level is lower than Logger log level.
        Parameters:
        -----------
        level: Union[str, int, None]
            The specified log level as a string, integer, or None.
        Returns:
        ----------
            Union[str, int]: The effective logging level.
        """

        # This function consider the following order of precedence:
        # 1 - If a log level is set using AWS Lambda Advanced Logging Controls, it sets it.
        # 2 - If a log level is passed to the constructor, it sets it
        # 3 - If a log level is set via setLevel, it sets it.
        # 4 - If a log level is set via Powertools env variables, it sets it.
        # 5 - If none of the above is true, the default log level applies INFO.

        lambda_log_level = self._get_aws_lambda_log_level()
        powertools_log_level = self._get_powertools_log_level(level)

        if powertools_log_level and lambda_log_level:
            # If Powertools log level is set and higher than AWS Lambda Advanced Logging Controls, emit a warning
            if logging.getLevelName(lambda_log_level) > logging.getLevelName(powertools_log_level):
                warnings.warn(
                    f"Current log level ({powertools_log_level}) does not match AWS Lambda Advanced Logging Controls "
                    f"minimum log level ({lambda_log_level}). This can lead to data loss, consider adjusting them.",
                    UserWarning,
                    stacklevel=2,
                )

        # AWS Lambda Advanced Logging Controls takes precedence over Powertools log level and we use this
        if lambda_log_level:
            return lambda_log_level

        # Check if Powertools log level is None, which means it's not set
        # We assume INFO as the default log level
        if powertools_log_level is None:
            return logging.INFO

        # Powertools log level is set, we use this
        return powertools_log_level.upper()

Instance variables

var handlers : List[logging.Handler]

List of registered logging handlers

Notes

Looking for the first configured handler? Use registered_handler property instead.

Expand source code
@property
def handlers(self) -> List[logging.Handler]:
    """List of registered logging handlers

    Notes
    -----

    Looking for the first configured handler? Use registered_handler property instead.
    """
    return self._logger.handlers
var log_level : int
Expand source code
@property
def log_level(self) -> int:
    return self._logger.level
var name : str
Expand source code
@property
def name(self) -> str:
    return self._logger.name
var registered_formatterBasePowertoolsFormatter

Convenience property to access the first logger formatter

Expand source code
@property
def registered_formatter(self) -> BasePowertoolsFormatter:
    """Convenience property to access the first logger formatter"""
    return self.registered_handler.formatter  # type: ignore[return-value]
var registered_handler : logging.Handler

Convenience property to access the first logger handler

Expand source code
@property
def registered_handler(self) -> logging.Handler:
    """Convenience property to access the first logger handler"""
    # We ignore mypy here because self.child encodes whether or not self._logger.parent is
    # None, mypy can't see this from context but we can
    handlers = self._logger.parent.handlers if self.child else self._logger.handlers  # type: ignore[union-attr]
    return handlers[0]

Methods

def addFilter(self, filter: logging._FilterType) ‑> None
Expand source code
def addFilter(self, filter: logging._FilterType) -> None:  # noqa: A002 # filter built-in usage
    return self._logger.addFilter(filter)
def addHandler(self, handler: logging.Handler) ‑> None
Expand source code
def addHandler(self, handler: logging.Handler) -> None:
    return self._logger.addHandler(handler)
def append_keys(self, **additional_keys: object) ‑> None
Expand source code
def append_keys(self, **additional_keys: object) -> None:
    self.registered_formatter.append_keys(**additional_keys)
def critical(self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs: object) ‑> None
Expand source code
def critical(
    self,
    msg: object,
    *args: object,
    exc_info: logging._ExcInfoType = None,
    stack_info: bool = False,
    stacklevel: int = 2,
    extra: Optional[Mapping[str, object]] = None,
    **kwargs: object,
) -> None:
    extra = extra or {}
    extra = {**extra, **kwargs}

    return self._logger.critical(
        msg,
        *args,
        exc_info=exc_info,
        stack_info=stack_info,
        stacklevel=stacklevel,
        extra=extra,
    )
def debug(self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs: object) ‑> None
Expand source code
def debug(
    self,
    msg: object,
    *args: object,
    exc_info: logging._ExcInfoType = None,
    stack_info: bool = False,
    stacklevel: int = 2,
    extra: Optional[Mapping[str, object]] = None,
    **kwargs: object,
) -> None:
    extra = extra or {}
    extra = {**extra, **kwargs}

    return self._logger.debug(
        msg,
        *args,
        exc_info=exc_info,
        stack_info=stack_info,
        stacklevel=stacklevel,
        extra=extra,
    )
def error(self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs: object) ‑> None
Expand source code
def error(
    self,
    msg: object,
    *args: object,
    exc_info: logging._ExcInfoType = None,
    stack_info: bool = False,
    stacklevel: int = 2,
    extra: Optional[Mapping[str, object]] = None,
    **kwargs: object,
) -> None:
    extra = extra or {}
    extra = {**extra, **kwargs}

    return self._logger.error(
        msg,
        *args,
        exc_info=exc_info,
        stack_info=stack_info,
        stacklevel=stacklevel,
        extra=extra,
    )
def exception(self, msg: object, *args: object, exc_info: logging._ExcInfoType = True, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs: object) ‑> None
Expand source code
def exception(
    self,
    msg: object,
    *args: object,
    exc_info: logging._ExcInfoType = True,
    stack_info: bool = False,
    stacklevel: int = 2,
    extra: Optional[Mapping[str, object]] = None,
    **kwargs: object,
) -> None:
    extra = extra or {}
    extra = {**extra, **kwargs}

    return self._logger.exception(
        msg,
        *args,
        exc_info=exc_info,
        stack_info=stack_info,
        stacklevel=stacklevel,
        extra=extra,
    )
def get_correlation_id(self) ‑> Optional[str]

Gets the correlation_id in the logging json

Returns

str, optional
Value for the correlation id
Expand source code
def get_correlation_id(self) -> Optional[str]:
    """Gets the correlation_id in the logging json

    Returns
    -------
    str, optional
        Value for the correlation id
    """
    if isinstance(self.registered_formatter, LambdaPowertoolsFormatter):
        return self.registered_formatter.log_format.get("correlation_id")
    return None
def get_current_keys(self) ‑> Dict[str, Any]
Expand source code
def get_current_keys(self) -> Dict[str, Any]:
    return self.registered_formatter.get_current_keys()
def info(self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs: object) ‑> None
Expand source code
def info(
    self,
    msg: object,
    *args: object,
    exc_info: logging._ExcInfoType = None,
    stack_info: bool = False,
    stacklevel: int = 2,
    extra: Optional[Mapping[str, object]] = None,
    **kwargs: object,
) -> None:
    extra = extra or {}
    extra = {**extra, **kwargs}

    return self._logger.info(
        msg,
        *args,
        exc_info=exc_info,
        stack_info=stack_info,
        stacklevel=stacklevel,
        extra=extra,
    )
def inject_lambda_context(self, lambda_handler: Optional[AnyCallableT] = None, log_event: Optional[bool] = None, correlation_id_path: Optional[str] = None, clear_state: Optional[bool] = False) ‑> Any

Decorator to capture Lambda contextual info and inject into logger

Parameters

clear_state : bool, optional
Instructs logger to remove any custom keys previously added
lambda_handler : Callable
Method to inject the lambda context
log_event : bool, optional
Instructs logger to log Lambda Event, by default False
correlation_id_path : str, optional
Optional JMESPath for the correlation_id

Environment Variables

POWERTOOLS_LOGGER_LOG_EVENT : str instruct logger to log Lambda Event (e.g. "true", "True", "TRUE")

Example

Captures Lambda contextual runtime info (e.g memory, arn, req_id)

from aws_lambda_powertools import Logger

logger = Logger(service="payment")

@logger.inject_lambda_context
def handler(event, context):
    logger.info("Hello")

Captures Lambda contextual runtime info and logs incoming request

from aws_lambda_powertools import Logger

logger = Logger(service="payment")

@logger.inject_lambda_context(log_event=True)
def handler(event, context):
    logger.info("Hello")

Returns

decorate : Callable
Decorated lambda handler
Expand source code
def inject_lambda_context(
    self,
    lambda_handler: Optional[AnyCallableT] = None,
    log_event: Optional[bool] = None,
    correlation_id_path: Optional[str] = None,
    clear_state: Optional[bool] = False,
) -> Any:
    """Decorator to capture Lambda contextual info and inject into logger

    Parameters
    ----------
    clear_state : bool, optional
        Instructs logger to remove any custom keys previously added
    lambda_handler : Callable
        Method to inject the lambda context
    log_event : bool, optional
        Instructs logger to log Lambda Event, by default False
    correlation_id_path: str, optional
        Optional JMESPath for the correlation_id

    Environment variables
    ---------------------
    POWERTOOLS_LOGGER_LOG_EVENT : str
        instruct logger to log Lambda Event (e.g. `"true", "True", "TRUE"`)

    Example
    -------
    **Captures Lambda contextual runtime info (e.g memory, arn, req_id)**

        from aws_lambda_powertools import Logger

        logger = Logger(service="payment")

        @logger.inject_lambda_context
        def handler(event, context):
            logger.info("Hello")

    **Captures Lambda contextual runtime info and logs incoming request**

        from aws_lambda_powertools import Logger

        logger = Logger(service="payment")

        @logger.inject_lambda_context(log_event=True)
        def handler(event, context):
            logger.info("Hello")

    Returns
    -------
    decorate : Callable
        Decorated lambda handler
    """

    # If handler is None we've been called with parameters
    # Return a partial function with args filled
    if lambda_handler is None:
        logger.debug("Decorator called with parameters")
        return functools.partial(
            self.inject_lambda_context,
            log_event=log_event,
            correlation_id_path=correlation_id_path,
            clear_state=clear_state,
        )

    log_event = resolve_truthy_env_var_choice(
        env=os.getenv(constants.LOGGER_LOG_EVENT_ENV, "false"),
        choice=log_event,
    )

    @functools.wraps(lambda_handler)
    def decorate(event, context, *args, **kwargs):
        lambda_context = build_lambda_context_model(context)
        cold_start = _is_cold_start()

        if clear_state:
            self.structure_logs(cold_start=cold_start, **lambda_context.__dict__)
        else:
            self.append_keys(cold_start=cold_start, **lambda_context.__dict__)

        if correlation_id_path:
            self.set_correlation_id(
                jmespath_utils.extract_data_from_envelope(envelope=correlation_id_path, data=event),
            )

        if log_event:
            logger.debug("Event received")
            self.info(extract_event_from_common_models(event))

        return lambda_handler(event, context, *args, **kwargs)

    return decorate
def removeFilter(self, filter: logging._FilterType) ‑> None
Expand source code
def removeFilter(self, filter: logging._FilterType) -> None:  # noqa: A002 # filter built-in usage
    return self._logger.removeFilter(filter)
def remove_keys(self, keys: Iterable[str]) ‑> None
Expand source code
def remove_keys(self, keys: Iterable[str]) -> None:
    self.registered_formatter.remove_keys(keys)
def setLevel(self, level: Union[str, int, None]) ‑> None
Expand source code
def setLevel(self, level: Union[str, int, None]) -> None:
    return self._logger.setLevel(self._determine_log_level(level))
def set_correlation_id(self, value: Optional[str]) ‑> None

Sets the correlation_id in the logging json

Parameters

value : str, optional
Value for the correlation id. None will remove the correlation_id
Expand source code
def set_correlation_id(self, value: Optional[str]) -> None:
    """Sets the correlation_id in the logging json

    Parameters
    ----------
    value : str, optional
        Value for the correlation id. None will remove the correlation_id
    """
    self.append_keys(correlation_id=value)
def structure_logs(self, append: bool = False, formatter_options: Optional[Dict] = None, **keys) ‑> None

Sets logging formatting to JSON.

Optionally, it can append keyword arguments to an existing logger, so it is available across future log statements.

Last keyword argument and value wins if duplicated.

Parameters

append : bool, optional
append keys provided to logger formatter, by default False
formatter_options : dict, optional
LambdaPowertoolsFormatter options to be propagated, by default {}
Expand source code
def structure_logs(self, append: bool = False, formatter_options: Optional[Dict] = None, **keys) -> None:
    """Sets logging formatting to JSON.

    Optionally, it can append keyword arguments
    to an existing logger, so it is available across future log statements.

    Last keyword argument and value wins if duplicated.

    Parameters
    ----------
    append : bool, optional
        append keys provided to logger formatter, by default False
    formatter_options : dict, optional
        LambdaPowertoolsFormatter options to be propagated, by default {}
    """
    formatter_options = formatter_options or {}

    # There are 3 operational modes for this method
    ## 1. Register a Powertools for AWS Lambda (Python) Formatter for the first time
    ## 2. Append new keys to the current logger formatter; deprecated in favour of append_keys
    ## 3. Add new keys and discard existing to the registered formatter

    # Mode 1
    log_keys = {**self._default_log_keys, **keys}
    is_logger_preconfigured = getattr(self._logger, "init", False)
    if not is_logger_preconfigured:
        formatter = self.logger_formatter or LambdaPowertoolsFormatter(**formatter_options, **log_keys)
        self.registered_handler.setFormatter(formatter)

        # when using a custom Powertools for AWS Lambda (Python) Formatter
        # standard and custom keys that are not Powertools for AWS Lambda (Python) Formatter parameters
        # should be appended and custom keys that might happen to be Powertools for AWS Lambda (Python)
        # Formatter parameters should be discarded this prevents adding them as custom keys, for example,
        # `json_default=<callable>` see https://github.com/aws-powertools/powertools-lambda-python/issues/1263
        custom_keys = {k: v for k, v in log_keys.items() if k not in RESERVED_FORMATTER_CUSTOM_KEYS}
        return self.registered_formatter.append_keys(**custom_keys)

    # Mode 2 (legacy)
    if append:
        # Maintenance: Add deprecation warning for major version
        return self.append_keys(**keys)

    # Mode 3
    self.registered_formatter.clear_state()
    self.registered_formatter.append_keys(**log_keys)
def warning(self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs: object) ‑> None
Expand source code
def warning(
    self,
    msg: object,
    *args: object,
    exc_info: logging._ExcInfoType = None,
    stack_info: bool = False,
    stacklevel: int = 2,
    extra: Optional[Mapping[str, object]] = None,
    **kwargs: object,
) -> None:
    extra = extra or {}
    extra = {**extra, **kwargs}

    return self._logger.warning(
        msg,
        *args,
        exc_info=exc_info,
        stack_info=stack_info,
        stacklevel=stacklevel,
        extra=extra,
    )
class Metrics (service: str | None = None, namespace: str | None = None, provider: AmazonCloudWatchEMFProvider | None = None)

Metrics create an CloudWatch EMF object with up to 100 metrics

Use Metrics when you need to create multiple metrics that have dimensions in common (e.g. service_name="payment").

Metrics up to 100 metrics in memory and are shared across all its instances. That means it can be safely instantiated outside of a Lambda function, or anywhere else.

A decorator (log_metrics) is provided so metrics are published at the end of its execution. If more than 100 metrics are added at a given function execution, these metrics are serialized and published before adding a given metric to prevent metric truncation.

Example

Creates a few metrics and publish at the end of a function execution

from aws_lambda_powertools import Metrics

metrics = Metrics(namespace="ServerlessAirline", service="payment")

@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler():
    metrics.add_metric(name="BookingConfirmation", unit="Count", value=1)
    metrics.add_dimension(name="function_version", value="$LATEST")

    return True

Environment Variables

POWERTOOLS_METRICS_NAMESPACE : str metric namespace POWERTOOLS_SERVICE_NAME : str service name used for default dimension

Parameters

service : str, optional
service name to be used as metric dimension, by default "service_undefined"
namespace : str, optional
Namespace for metrics
provider : AmazonCloudWatchEMFProvider, optional
Pre-configured AmazonCloudWatchEMFProvider provider

Raises

MetricUnitError
When metric unit isn't supported by CloudWatch
MetricResolutionError
When metric resolution isn't supported by CloudWatch
MetricValueError
When metric value isn't a number
SchemaValidationError
When metric object fails EMF schema validation
Expand source code
class Metrics:
    """Metrics create an CloudWatch EMF object with up to 100 metrics

    Use Metrics when you need to create multiple metrics that have
    dimensions in common (e.g. service_name="payment").

    Metrics up to 100 metrics in memory and are shared across
    all its instances. That means it can be safely instantiated outside
    of a Lambda function, or anywhere else.

    A decorator (log_metrics) is provided so metrics are published at the end of its execution.
    If more than 100 metrics are added at a given function execution,
    these metrics are serialized and published before adding a given metric
    to prevent metric truncation.

    Example
    -------
    **Creates a few metrics and publish at the end of a function execution**

        from aws_lambda_powertools import Metrics

        metrics = Metrics(namespace="ServerlessAirline", service="payment")

        @metrics.log_metrics(capture_cold_start_metric=True)
        def lambda_handler():
            metrics.add_metric(name="BookingConfirmation", unit="Count", value=1)
            metrics.add_dimension(name="function_version", value="$LATEST")

            return True

    Environment variables
    ---------------------
    POWERTOOLS_METRICS_NAMESPACE : str
        metric namespace
    POWERTOOLS_SERVICE_NAME : str
        service name used for default dimension

    Parameters
    ----------
    service : str, optional
        service name to be used as metric dimension, by default "service_undefined"
    namespace : str, optional
        Namespace for metrics
    provider: AmazonCloudWatchEMFProvider, optional
        Pre-configured AmazonCloudWatchEMFProvider provider

    Raises
    ------
    MetricUnitError
        When metric unit isn't supported by CloudWatch
    MetricResolutionError
        When metric resolution isn't supported by CloudWatch
    MetricValueError
        When metric value isn't a number
    SchemaValidationError
        When metric object fails EMF schema validation
    """

    # NOTE: We use class attrs to share metrics data across instances
    # this allows customers to initialize Metrics() throughout their code base (and middlewares)
    # and not get caught by accident with metrics data loss, or data deduplication
    # e.g., m1 and m2 add metric ProductCreated, however m1 has 'version' dimension  but m2 doesn't
    # Result: ProductCreated is created twice as we now have 2 different EMF blobs
    _metrics: Dict[str, Any] = {}
    _dimensions: Dict[str, str] = {}
    _metadata: Dict[str, Any] = {}
    _default_dimensions: Dict[str, Any] = {}

    def __init__(
        self,
        service: str | None = None,
        namespace: str | None = None,
        provider: AmazonCloudWatchEMFProvider | None = None,
    ):
        self.metric_set = self._metrics
        self.metadata_set = self._metadata
        self.default_dimensions = self._default_dimensions
        self.dimension_set = self._dimensions

        self.dimension_set.update(**self._default_dimensions)

        if provider is None:
            self.provider = AmazonCloudWatchEMFProvider(
                namespace=namespace,
                service=service,
                metric_set=self.metric_set,
                dimension_set=self.dimension_set,
                metadata_set=self.metadata_set,
                default_dimensions=self._default_dimensions,
            )
        else:
            self.provider = provider

    def add_metric(
        self,
        name: str,
        unit: MetricUnit | str,
        value: float,
        resolution: MetricResolution | int = 60,
    ) -> None:
        self.provider.add_metric(name=name, unit=unit, value=value, resolution=resolution)

    def add_dimension(self, name: str, value: str) -> None:
        self.provider.add_dimension(name=name, value=value)

    def serialize_metric_set(
        self,
        metrics: Dict | None = None,
        dimensions: Dict | None = None,
        metadata: Dict | None = None,
    ) -> CloudWatchEMFOutput:
        return self.provider.serialize_metric_set(metrics=metrics, dimensions=dimensions, metadata=metadata)

    def add_metadata(self, key: str, value: Any) -> None:
        self.provider.add_metadata(key=key, value=value)

    def set_timestamp(self, timestamp: int):
        """
        Set the timestamp for the metric.

        Parameters:
        -----------
        timestamp: int | datetime.datetime
            The timestamp to create the metric.
            If an integer is provided, it is assumed to be the epoch time in milliseconds.
            If a datetime object is provided, it will be converted to epoch time in milliseconds.
        """
        self.provider.set_timestamp(timestamp=timestamp)

    def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None:
        self.provider.flush_metrics(raise_on_empty_metrics=raise_on_empty_metrics)

    def log_metrics(
        self,
        lambda_handler: AnyCallableT | None = None,
        capture_cold_start_metric: bool = False,
        raise_on_empty_metrics: bool = False,
        default_dimensions: Dict[str, str] | None = None,
        **kwargs,
    ):
        return self.provider.log_metrics(
            lambda_handler=lambda_handler,
            capture_cold_start_metric=capture_cold_start_metric,
            raise_on_empty_metrics=raise_on_empty_metrics,
            default_dimensions=default_dimensions,
            **kwargs,
        )

    def set_default_dimensions(self, **dimensions) -> None:
        self.provider.set_default_dimensions(**dimensions)
        """Persist dimensions across Lambda invocations

        Parameters
        ----------
        dimensions : Dict[str, Any], optional
            metric dimensions as key=value

        Example
        -------
        **Sets some default dimensions that will always be present across metrics and invocations**

            from aws_lambda_powertools import Metrics

            metrics = Metrics(namespace="ServerlessAirline", service="payment")
            metrics.set_default_dimensions(environment="demo", another="one")

            @metrics.log_metrics()
            def lambda_handler():
                return True
        """
        for name, value in dimensions.items():
            self.add_dimension(name, value)

        self.default_dimensions.update(**dimensions)

    def clear_default_dimensions(self) -> None:
        self.provider.default_dimensions.clear()
        self.default_dimensions.clear()

    def clear_metrics(self) -> None:
        self.provider.clear_metrics()

    # We now allow customers to bring their own instance
    # of the AmazonCloudWatchEMFProvider provider
    # So we need to define getter/setter for namespace and service properties
    # To access these attributes on the provider instance.
    @property
    def namespace(self):
        return self.provider.namespace

    @namespace.setter
    def namespace(self, namespace):
        self.provider.namespace = namespace

    @property
    def service(self):
        return self.provider.service

    @service.setter
    def service(self, service):
        self.provider.service = service

Instance variables

var namespace
Expand source code
@property
def namespace(self):
    return self.provider.namespace
var service
Expand source code
@property
def service(self):
    return self.provider.service

Methods

def add_dimension(self, name: str, value: str) ‑> None
Expand source code
def add_dimension(self, name: str, value: str) -> None:
    self.provider.add_dimension(name=name, value=value)
def add_metadata(self, key: str, value: Any) ‑> None
Expand source code
def add_metadata(self, key: str, value: Any) -> None:
    self.provider.add_metadata(key=key, value=value)
def add_metric(self, name: str, unit: MetricUnit | str, value: float, resolution: MetricResolution | int = 60) ‑> None
Expand source code
def add_metric(
    self,
    name: str,
    unit: MetricUnit | str,
    value: float,
    resolution: MetricResolution | int = 60,
) -> None:
    self.provider.add_metric(name=name, unit=unit, value=value, resolution=resolution)
def clear_default_dimensions(self) ‑> None
Expand source code
def clear_default_dimensions(self) -> None:
    self.provider.default_dimensions.clear()
    self.default_dimensions.clear()
def clear_metrics(self) ‑> None
Expand source code
def clear_metrics(self) -> None:
    self.provider.clear_metrics()
def flush_metrics(self, raise_on_empty_metrics: bool = False) ‑> None
Expand source code
def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None:
    self.provider.flush_metrics(raise_on_empty_metrics=raise_on_empty_metrics)
def log_metrics(self, lambda_handler: AnyCallableT | None = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, default_dimensions: Dict[str, str] | None = None, **kwargs)
Expand source code
def log_metrics(
    self,
    lambda_handler: AnyCallableT | None = None,
    capture_cold_start_metric: bool = False,
    raise_on_empty_metrics: bool = False,
    default_dimensions: Dict[str, str] | None = None,
    **kwargs,
):
    return self.provider.log_metrics(
        lambda_handler=lambda_handler,
        capture_cold_start_metric=capture_cold_start_metric,
        raise_on_empty_metrics=raise_on_empty_metrics,
        default_dimensions=default_dimensions,
        **kwargs,
    )
def serialize_metric_set(self, metrics: Dict | None = None, dimensions: Dict | None = None, metadata: Dict | None = None) ‑> CloudWatchEMFOutput
Expand source code
def serialize_metric_set(
    self,
    metrics: Dict | None = None,
    dimensions: Dict | None = None,
    metadata: Dict | None = None,
) -> CloudWatchEMFOutput:
    return self.provider.serialize_metric_set(metrics=metrics, dimensions=dimensions, metadata=metadata)
def set_default_dimensions(self, **dimensions) ‑> None
Expand source code
def set_default_dimensions(self, **dimensions) -> None:
    self.provider.set_default_dimensions(**dimensions)
    """Persist dimensions across Lambda invocations

    Parameters
    ----------
    dimensions : Dict[str, Any], optional
        metric dimensions as key=value

    Example
    -------
    **Sets some default dimensions that will always be present across metrics and invocations**

        from aws_lambda_powertools import Metrics

        metrics = Metrics(namespace="ServerlessAirline", service="payment")
        metrics.set_default_dimensions(environment="demo", another="one")

        @metrics.log_metrics()
        def lambda_handler():
            return True
    """
    for name, value in dimensions.items():
        self.add_dimension(name, value)

    self.default_dimensions.update(**dimensions)
def set_timestamp(self, timestamp: int)

Set the timestamp for the metric.

Parameters:

timestamp: int | datetime.datetime The timestamp to create the metric. If an integer is provided, it is assumed to be the epoch time in milliseconds. If a datetime object is provided, it will be converted to epoch time in milliseconds.

Expand source code
def set_timestamp(self, timestamp: int):
    """
    Set the timestamp for the metric.

    Parameters:
    -----------
    timestamp: int | datetime.datetime
        The timestamp to create the metric.
        If an integer is provided, it is assumed to be the epoch time in milliseconds.
        If a datetime object is provided, it will be converted to epoch time in milliseconds.
    """
    self.provider.set_timestamp(timestamp=timestamp)
class Tracer (service: Optional[str] = None, disabled: Optional[bool] = None, auto_patch: Optional[bool] = None, patch_modules: Optional[Sequence[str]] = None, provider: Optional[BaseProvider] = None)

Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions

When running locally, it detects whether it's running via SAM CLI, and if it is it returns dummy segments/subsegments instead.

By default, it patches all available libraries supported by X-Ray SDK. Patching is automatically disabled when running locally via SAM CLI or by any other means.

Ref: https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/thirdparty.html

Tracer keeps a copy of its configuration as it can be instantiated more than once. This is useful when you are using your own middlewares and want to utilize an existing Tracer. Make sure to set auto_patch=False in subsequent Tracer instances to avoid double patching.

Environment Variables

POWERTOOLS_TRACE_DISABLED : str disable tracer (e.g. "true", "True", "TRUE") POWERTOOLS_SERVICE_NAME : str service name POWERTOOLS_TRACER_CAPTURE_RESPONSE : str disable auto-capture response as metadata (e.g. "true", "True", "TRUE") POWERTOOLS_TRACER_CAPTURE_ERROR : str disable auto-capture error as metadata (e.g. "true", "True", "TRUE")

Parameters

service : str
Service name that will be appended in all tracing metadata
auto_patch : bool
Patch existing imported modules during initialization, by default True
disabled : bool
Flag to explicitly disable tracing, useful when running/testing locally Env POWERTOOLS_TRACE_DISABLED="true"
patch_modules : Optional[Sequence[str]]
Tuple of modules supported by tracing provider to patch, by default all modules are patched
provider : BaseProvider
Tracing provider, by default it is aws_xray_sdk.core.xray_recorder

Returns

Tracer
Tracer instance with imported modules patched

Example

A Lambda function using Tracer

from aws_lambda_powertools import Tracer
tracer = Tracer(service="greeting")

@tracer.capture_method
def greeting(name: str) -> Dict:
    return {
        "name": name
    }

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> Dict:
    print("Received event from Lambda...")
    response = greeting(name="Heitor")
    return response

Booking Lambda function using Tracer that adds additional annotation/metadata

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
def confirm_booking(booking_id: str) -> Dict:
        resp = add_confirmation(booking_id)

        tracer.put_annotation("BookingConfirmation", resp["requestId"])
        tracer.put_metadata("Booking confirmation", resp)

        return resp

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> Dict:
    print("Received event from Lambda...")
    booking_id = event.get("booking_id")
    response = confirm_booking(booking_id=booking_id)
    return response

A Lambda function using service name via POWERTOOLS_SERVICE_NAME

export POWERTOOLS_SERVICE_NAME="booking"
from aws_lambda_powertools import Tracer
tracer = Tracer()

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> Dict:
    print("Received event from Lambda...")
    response = greeting(name="Lessa")
    return response

Reuse an existing instance of Tracer anywhere in the code

# lambda_handler.py
from aws_lambda_powertools import Tracer
tracer = Tracer()

@tracer.capture_lambda_handler
def handler(event: dict, context: Any) -> Dict:
    ...

# utils.py
from aws_lambda_powertools import Tracer
tracer = Tracer()
...

Limitations

  • Async handler not supported
Expand source code
class Tracer:
    """Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions

    When running locally, it detects whether it's running via SAM CLI,
    and if it is it returns dummy segments/subsegments instead.

    By default, it patches all available libraries supported by X-Ray SDK. Patching is
    automatically disabled when running locally via SAM CLI or by any other means. \n
    Ref: https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/thirdparty.html

    Tracer keeps a copy of its configuration as it can be instantiated more than once. This
    is useful when you are using your own middlewares and want to utilize an existing Tracer.
    Make sure to set `auto_patch=False` in subsequent Tracer instances to avoid double patching.

    Environment variables
    ---------------------
    POWERTOOLS_TRACE_DISABLED : str
        disable tracer (e.g. `"true", "True", "TRUE"`)
    POWERTOOLS_SERVICE_NAME : str
        service name
    POWERTOOLS_TRACER_CAPTURE_RESPONSE : str
        disable auto-capture response as metadata (e.g. `"true", "True", "TRUE"`)
    POWERTOOLS_TRACER_CAPTURE_ERROR : str
        disable auto-capture error as metadata (e.g. `"true", "True", "TRUE"`)

    Parameters
    ----------
    service: str
        Service name that will be appended in all tracing metadata
    auto_patch: bool
        Patch existing imported modules during initialization, by default True
    disabled: bool
        Flag to explicitly disable tracing, useful when running/testing locally
        `Env POWERTOOLS_TRACE_DISABLED="true"`
    patch_modules: Optional[Sequence[str]]
        Tuple of modules supported by tracing provider to patch, by default all modules are patched
    provider: BaseProvider
        Tracing provider, by default it is aws_xray_sdk.core.xray_recorder

    Returns
    -------
    Tracer
        Tracer instance with imported modules patched

    Example
    -------
    **A Lambda function using Tracer**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="greeting")

        @tracer.capture_method
        def greeting(name: str) -> Dict:
            return {
                "name": name
            }

        @tracer.capture_lambda_handler
        def handler(event: dict, context: Any) -> Dict:
            print("Received event from Lambda...")
            response = greeting(name="Heitor")
            return response

    **Booking Lambda function using Tracer that adds additional annotation/metadata**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        def confirm_booking(booking_id: str) -> Dict:
                resp = add_confirmation(booking_id)

                tracer.put_annotation("BookingConfirmation", resp["requestId"])
                tracer.put_metadata("Booking confirmation", resp)

                return resp

        @tracer.capture_lambda_handler
        def handler(event: dict, context: Any) -> Dict:
            print("Received event from Lambda...")
            booking_id = event.get("booking_id")
            response = confirm_booking(booking_id=booking_id)
            return response

    **A Lambda function using service name via POWERTOOLS_SERVICE_NAME**

        export POWERTOOLS_SERVICE_NAME="booking"
        from aws_lambda_powertools import Tracer
        tracer = Tracer()

        @tracer.capture_lambda_handler
        def handler(event: dict, context: Any) -> Dict:
            print("Received event from Lambda...")
            response = greeting(name="Lessa")
            return response

    **Reuse an existing instance of Tracer anywhere in the code**

        # lambda_handler.py
        from aws_lambda_powertools import Tracer
        tracer = Tracer()

        @tracer.capture_lambda_handler
        def handler(event: dict, context: Any) -> Dict:
            ...

        # utils.py
        from aws_lambda_powertools import Tracer
        tracer = Tracer()
        ...

    Limitations
    -----------
    * Async handler not supported
    """

    _default_config: Dict[str, Any] = {
        "service": "",
        "disabled": False,
        "auto_patch": True,
        "patch_modules": None,
        "provider": None,
    }
    _config = copy.copy(_default_config)

    def __init__(
        self,
        service: Optional[str] = None,
        disabled: Optional[bool] = None,
        auto_patch: Optional[bool] = None,
        patch_modules: Optional[Sequence[str]] = None,
        provider: Optional[BaseProvider] = None,
    ):
        self.__build_config(
            service=service,
            disabled=disabled,
            auto_patch=auto_patch,
            patch_modules=patch_modules,
            provider=provider,
        )
        self.provider: BaseProvider = self._config["provider"]
        self.disabled = self._config["disabled"]
        self.service = self._config["service"]
        self.auto_patch = self._config["auto_patch"]

        if self.disabled:
            self._disable_tracer_provider()

        if self.auto_patch:
            self.patch(modules=patch_modules)

        if self._is_xray_provider():
            self._disable_xray_trace_batching()

    def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]):
        """Adds annotation to existing segment or subsegment

        Parameters
        ----------
        key : str
            Annotation key
        value : Union[str, numbers.Number, bool]
            Value for annotation

        Example
        -------
        Custom annotation for a pseudo service named payment

            tracer = Tracer(service="payment")
            tracer.put_annotation("PaymentStatus", "CONFIRMED")
        """
        if self.disabled:
            logger.debug("Tracing has been disabled, aborting put_annotation")
            return

        logger.debug(f"Annotating on key '{key}' with '{value}'")
        self.provider.put_annotation(key=key, value=value)

    def put_metadata(self, key: str, value: Any, namespace: Optional[str] = None):
        """Adds metadata to existing segment or subsegment

        Parameters
        ----------
        key : str
            Metadata key
        value : any
            Value for metadata
        namespace : str, optional
            Namespace that metadata will lie under, by default None

        Example
        -------
        Custom metadata for a pseudo service named payment

            tracer = Tracer(service="payment")
            response = collect_payment()
            tracer.put_metadata("Payment collection", response)
        """
        if self.disabled:
            logger.debug("Tracing has been disabled, aborting put_metadata")
            return

        namespace = namespace or self.service
        logger.debug(f"Adding metadata on key '{key}' with '{value}' at namespace '{namespace}'")
        self.provider.put_metadata(key=key, value=value, namespace=namespace)

    def patch(self, modules: Optional[Sequence[str]] = None):
        """Patch modules for instrumentation.

        Patches all supported modules by default if none are given.

        Parameters
        ----------
        modules : Optional[Sequence[str]]
            List of modules to be patched, optional by default
        """
        if self.disabled:
            logger.debug("Tracing has been disabled, aborting patch")
            return

        if modules is None:
            self.provider.patch_all()
        else:
            self.provider.patch(modules)

    def capture_lambda_handler(
        self,
        lambda_handler: Union[Callable[[Dict, Any], Any], Optional[Callable[[Dict, Any, Optional[Dict]], Any]]] = None,
        capture_response: Optional[bool] = None,
        capture_error: Optional[bool] = None,
    ):
        """Decorator to create subsegment for lambda handlers

        As Lambda follows (event, context) signature we can remove some of the boilerplate
        and also capture any exception any Lambda function throws or its response as metadata

        Parameters
        ----------
        lambda_handler : Callable
            Method to annotate on
        capture_response : bool, optional
            Instructs tracer to not include handler's response as metadata
        capture_error : bool, optional
            Instructs tracer to not include handler's error as metadata, by default True

        Example
        -------
        **Lambda function using capture_lambda_handler decorator**

            tracer = Tracer(service="payment")
            @tracer.capture_lambda_handler
            def handler(event, context):
                ...

        **Preventing Tracer to log response as metadata**

            tracer = Tracer(service="payment")
            @tracer.capture_lambda_handler(capture_response=False)
            def handler(event, context):
                ...

        Raises
        ------
        err
            Exception raised by method
        """
        # If handler is None we've been called with parameters
        # Return a partial function with args filled
        if lambda_handler is None:
            logger.debug("Decorator called with parameters")
            return functools.partial(
                self.capture_lambda_handler,
                capture_response=capture_response,
                capture_error=capture_error,
            )

        lambda_handler_name = lambda_handler.__name__
        capture_response = resolve_truthy_env_var_choice(
            env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"),
            choice=capture_response,
        )
        capture_error = resolve_truthy_env_var_choice(
            env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"),
            choice=capture_error,
        )

        @functools.wraps(lambda_handler)
        def decorate(event, context, **kwargs):
            with self.provider.in_subsegment(name=f"## {lambda_handler_name}") as subsegment:
                try:
                    logger.debug("Calling lambda handler")
                    response = lambda_handler(event, context, **kwargs)
                    logger.debug("Received lambda handler response successfully")
                    self._add_response_as_metadata(
                        method_name=lambda_handler_name,
                        data=response,
                        subsegment=subsegment,
                        capture_response=capture_response,
                    )
                except Exception as err:
                    logger.exception(f"Exception received from {lambda_handler_name}")
                    self._add_full_exception_as_metadata(
                        method_name=lambda_handler_name,
                        error=err,
                        subsegment=subsegment,
                        capture_error=capture_error,
                    )

                    raise
                finally:
                    global is_cold_start
                    logger.debug("Annotating cold start")
                    subsegment.put_annotation(key="ColdStart", value=is_cold_start)

                    if is_cold_start:
                        is_cold_start = False

                    if self.service:
                        subsegment.put_annotation(key="Service", value=self.service)

                return response

        return decorate

    # see #465
    @overload
    def capture_method(self, method: "AnyCallableT") -> "AnyCallableT": ...  # pragma: no cover

    @overload
    def capture_method(
        self,
        method: None = None,
        capture_response: Optional[bool] = None,
        capture_error: Optional[bool] = None,
    ) -> Callable[["AnyCallableT"], "AnyCallableT"]: ...  # pragma: no cover

    def capture_method(
        self,
        method: Optional[AnyCallableT] = None,
        capture_response: Optional[bool] = None,
        capture_error: Optional[bool] = None,
    ) -> AnyCallableT:
        """Decorator to create subsegment for arbitrary functions

        It also captures both response and exceptions as metadata
        and creates a subsegment named `## <method_module.method_qualifiedname>`
        # see here: [Qualified name for classes and functions](https://peps.python.org/pep-3155/)

        When running [async functions concurrently](https://docs.python.org/3/library/asyncio-task.html#id6),
        methods may impact each others subsegment, and can trigger
        and AlreadyEndedException from X-Ray due to async nature.

        For this use case, either use `capture_method` only where
        `async.gather` is called, or use `in_subsegment_async`
        context manager via our escape hatch mechanism - See examples.

        Parameters
        ----------
        method : Callable
            Method to annotate on
        capture_response : bool, optional
            Instructs tracer to not include method's response as metadata
        capture_error : bool, optional
            Instructs tracer to not include handler's error as metadata, by default True

        Example
        -------
        **Custom function using capture_method decorator**

            tracer = Tracer(service="payment")
            @tracer.capture_method
            def some_function()

        **Custom async method using capture_method decorator**

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            @tracer.capture_method
            async def confirm_booking(booking_id: str) -> Dict:
                resp = call_to_booking_service()

                tracer.put_annotation("BookingConfirmation", resp["requestId"])
                tracer.put_metadata("Booking confirmation", resp)

                return resp

            def lambda_handler(event: dict, context: Any) -> Dict:
                booking_id = event.get("booking_id")
                asyncio.run(confirm_booking(booking_id=booking_id))

        **Custom generator function using capture_method decorator**

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            @tracer.capture_method
            def bookings_generator(booking_id):
                resp = call_to_booking_service()
                yield resp[0]
                yield resp[1]

            def lambda_handler(event: dict, context: Any) -> Dict:
                gen = bookings_generator(booking_id=booking_id)
                result = list(gen)

        **Custom generator context manager using capture_method decorator**

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            @tracer.capture_method
            @contextlib.contextmanager
            def booking_actions(booking_id):
                resp = call_to_booking_service()
                yield "example result"
                cleanup_stuff()

            def lambda_handler(event: dict, context: Any) -> Dict:
                booking_id = event.get("booking_id")

                with booking_actions(booking_id=booking_id) as booking:
                    result = booking

        **Tracing nested async calls**

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            @tracer.capture_method
            async def get_identity():
                ...

            @tracer.capture_method
            async def long_async_call():
                ...

            @tracer.capture_method
            async def async_tasks():
                await get_identity()
                ret = await long_async_call()

                return { "task": "done", **ret }

        **Safely tracing concurrent async calls with decorator**

        This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164)

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            async def get_identity():
                async with aioboto3.client("sts") as sts:
                    account = await sts.get_caller_identity()
                    return account

            async def long_async_call():
                ...

            @tracer.capture_method
            async def async_tasks():
                _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

                return { "task": "done", **ret }

        **Safely tracing each concurrent async calls with escape hatch**

        This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164)

            from aws_lambda_powertools import Tracer
            tracer = Tracer(service="booking")

            async def get_identity():
                async tracer.provider.in_subsegment_async("## get_identity"):
                    ...

            async def long_async_call():
                async tracer.provider.in_subsegment_async("## long_async_call"):
                    ...

            @tracer.capture_method
            async def async_tasks():
                _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

                return { "task": "done", **ret }

        Raises
        ------
        err
            Exception raised by method
        """
        # If method is None we've been called with parameters
        # Return a partial function with args filled
        if method is None:
            logger.debug("Decorator called with parameters")
            return cast(
                AnyCallableT,
                functools.partial(self.capture_method, capture_response=capture_response, capture_error=capture_error),
            )

        # Example: app.ClassA.get_all  # noqa ERA001
        # Valid characters can be found at http://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html
        method_name = sanitize_xray_segment_name(f"{method.__module__}.{method.__qualname__}")

        capture_response = resolve_truthy_env_var_choice(
            env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"),
            choice=capture_response,
        )
        capture_error = resolve_truthy_env_var_choice(
            env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"),
            choice=capture_error,
        )

        # Maintenance: Need a factory/builder here to simplify this now
        if inspect.iscoroutinefunction(method):
            return self._decorate_async_function(
                method=method,
                capture_response=capture_response,
                capture_error=capture_error,
                method_name=method_name,
            )
        elif inspect.isgeneratorfunction(method):
            return self._decorate_generator_function(
                method=method,
                capture_response=capture_response,
                capture_error=capture_error,
                method_name=method_name,
            )
        elif hasattr(method, "__wrapped__") and inspect.isgeneratorfunction(method.__wrapped__):
            return self._decorate_generator_function_with_context_manager(
                method=method,
                capture_response=capture_response,
                capture_error=capture_error,
                method_name=method_name,
            )
        else:
            return self._decorate_sync_function(
                method=method,
                capture_response=capture_response,
                capture_error=capture_error,
                method_name=method_name,
            )

    def _decorate_async_function(
        self,
        method: Callable,
        capture_response: Optional[Union[bool, str]] = None,
        capture_error: Optional[Union[bool, str]] = None,
        method_name: Optional[str] = None,
    ):
        @functools.wraps(method)
        async def decorate(*args, **kwargs):
            async with self.provider.in_subsegment_async(name=f"## {method_name}") as subsegment:
                try:
                    logger.debug(f"Calling method: {method_name}")
                    response = await method(*args, **kwargs)
                    self._add_response_as_metadata(
                        method_name=method_name,
                        data=response,
                        subsegment=subsegment,
                        capture_response=capture_response,
                    )
                except Exception as err:
                    logger.exception(f"Exception received from '{method_name}' method")
                    self._add_full_exception_as_metadata(
                        method_name=method_name,
                        error=err,
                        subsegment=subsegment,
                        capture_error=capture_error,
                    )
                    raise

                return response

        return decorate

    def _decorate_generator_function(
        self,
        method: Callable,
        capture_response: Optional[Union[bool, str]] = None,
        capture_error: Optional[Union[bool, str]] = None,
        method_name: Optional[str] = None,
    ):
        @functools.wraps(method)
        def decorate(*args, **kwargs):
            with self.provider.in_subsegment(name=f"## {method_name}") as subsegment:
                try:
                    logger.debug(f"Calling method: {method_name}")
                    result = yield from method(*args, **kwargs)
                    self._add_response_as_metadata(
                        method_name=method_name,
                        data=result,
                        subsegment=subsegment,
                        capture_response=capture_response,
                    )
                except Exception as err:
                    logger.exception(f"Exception received from '{method_name}' method")
                    self._add_full_exception_as_metadata(
                        method_name=method_name,
                        error=err,
                        subsegment=subsegment,
                        capture_error=capture_error,
                    )
                    raise

                return result

        return decorate

    def _decorate_generator_function_with_context_manager(
        self,
        method: Callable,
        capture_response: Optional[Union[bool, str]] = None,
        capture_error: Optional[Union[bool, str]] = None,
        method_name: Optional[str] = None,
    ):
        @functools.wraps(method)
        @contextlib.contextmanager
        def decorate(*args, **kwargs):
            with self.provider.in_subsegment(name=f"## {method_name}") as subsegment:
                try:
                    logger.debug(f"Calling method: {method_name}")
                    with method(*args, **kwargs) as return_val:
                        result = return_val
                        yield result
                    self._add_response_as_metadata(
                        method_name=method_name,
                        data=result,
                        subsegment=subsegment,
                        capture_response=capture_response,
                    )
                except Exception as err:
                    logger.exception(f"Exception received from '{method_name}' method")
                    self._add_full_exception_as_metadata(
                        method_name=method_name,
                        error=err,
                        subsegment=subsegment,
                        capture_error=capture_error,
                    )
                    raise

        return decorate

    def _decorate_sync_function(
        self,
        method: AnyCallableT,
        capture_response: Optional[Union[bool, str]] = None,
        capture_error: Optional[Union[bool, str]] = None,
        method_name: Optional[str] = None,
    ) -> AnyCallableT:
        @functools.wraps(method)
        def decorate(*args, **kwargs):
            with self.provider.in_subsegment(name=f"## {method_name}") as subsegment:
                try:
                    logger.debug(f"Calling method: {method_name}")
                    response = method(*args, **kwargs)
                    self._add_response_as_metadata(
                        method_name=method_name,
                        data=response,
                        subsegment=subsegment,
                        capture_response=capture_response,
                    )
                except Exception as err:
                    logger.exception(f"Exception received from '{method_name}' method")
                    self._add_full_exception_as_metadata(
                        method_name=method_name,
                        error=err,
                        subsegment=subsegment,
                        capture_error=capture_error,
                    )
                    raise

                return response

        return cast(AnyCallableT, decorate)

    def _add_response_as_metadata(
        self,
        method_name: Optional[str] = None,
        data: Optional[Any] = None,
        subsegment: Optional[BaseSegment] = None,
        capture_response: Optional[Union[bool, str]] = None,
    ):
        """Add response as metadata for given subsegment

        Parameters
        ----------
        method_name : str, optional
            method name to add as metadata key, by default None
        data : Any, optional
            data to add as subsegment metadata, by default None
        subsegment : BaseSegment, optional
            existing subsegment to add metadata on, by default None
        capture_response : bool, optional
            Do not include response as metadata
        """
        if data is None or not capture_response or subsegment is None:
            return

        subsegment.put_metadata(key=f"{method_name} response", value=data, namespace=self.service)

    def _add_full_exception_as_metadata(
        self,
        method_name: str,
        error: Exception,
        subsegment: BaseSegment,
        capture_error: Optional[bool] = None,
    ):
        """Add full exception object as metadata for given subsegment

        Parameters
        ----------
        method_name : str
            method name to add as metadata key, by default None
        error : Exception
            error to add as subsegment metadata, by default None
        subsegment : BaseSegment
            existing subsegment to add metadata on, by default None
        capture_error : bool, optional
            Do not include error as metadata, by default True
        """
        if not capture_error:
            return

        subsegment.put_metadata(key=f"{method_name} error", value=error, namespace=self.service)

    @staticmethod
    def _disable_tracer_provider():
        """Forcefully disables tracing"""
        logger.debug("Disabling tracer provider...")
        aws_xray_sdk.global_sdk_config.set_sdk_enabled(False)

    @staticmethod
    def _is_tracer_disabled() -> Union[bool, str]:
        """Detects whether trace has been disabled

        Tracing is automatically disabled in the following conditions:

        1. Explicitly disabled via `TRACE_DISABLED` environment variable
        2. Running in Lambda Emulators, or locally where X-Ray Daemon will not be listening
        3. Explicitly disabled via constructor e.g `Tracer(disabled=True)`

        Returns
        -------
        Union[bool, str]
        """
        logger.debug("Verifying whether Tracing has been disabled")
        is_lambda_env = os.getenv(constants.LAMBDA_TASK_ROOT_ENV)
        is_lambda_sam_cli = os.getenv(constants.SAM_LOCAL_ENV)
        is_chalice_cli = os.getenv(constants.CHALICE_LOCAL_ENV)
        is_disabled = resolve_truthy_env_var_choice(env=os.getenv(constants.TRACER_DISABLED_ENV, "false"))

        if is_disabled:
            logger.debug("Tracing has been disabled via env var POWERTOOLS_TRACE_DISABLED")
            return is_disabled

        if not is_lambda_env or (is_lambda_sam_cli or is_chalice_cli):
            logger.debug("Running outside Lambda env; disabling Tracing")
            return True

        return False

    def __build_config(
        self,
        service: Optional[str] = None,
        disabled: Optional[bool] = None,
        auto_patch: Optional[bool] = None,
        patch_modules: Optional[Sequence[str]] = None,
        provider: Optional[BaseProvider] = None,
    ):
        """Populates Tracer config for new and existing initializations"""
        is_disabled = disabled if disabled is not None else self._is_tracer_disabled()
        is_service = resolve_env_var_choice(choice=service, env=os.getenv(constants.SERVICE_NAME_ENV))

        # Logic: Choose overridden option first, previously cached config, or default if available
        self._config["provider"] = provider or self._config["provider"] or self._patch_xray_provider()
        self._config["auto_patch"] = auto_patch if auto_patch is not None else self._config["auto_patch"]
        self._config["service"] = is_service or self._config["service"]
        self._config["disabled"] = is_disabled or self._config["disabled"]
        self._config["patch_modules"] = patch_modules or self._config["patch_modules"]

    @classmethod
    def _reset_config(cls):
        cls._config = copy.copy(cls._default_config)

    def _patch_xray_provider(self):
        # Due to Lazy Import, we need to activate `core` attrib via import
        # we also need to include `patch`, `patch_all` methods
        # to ensure patch calls are done via the provider
        from aws_xray_sdk.core import xray_recorder  # type: ignore

        provider = xray_recorder
        provider.patch = aws_xray_sdk.core.patch
        provider.patch_all = aws_xray_sdk.core.patch_all

        return provider

    def _disable_xray_trace_batching(self):
        """Configure X-Ray SDK to send subsegment individually over batching
        Known issue: https://github.com/aws-powertools/powertools-lambda-python/issues/283
        """
        if self.disabled:
            logger.debug("Tracing has been disabled, aborting streaming override")
            return

        aws_xray_sdk.core.xray_recorder.configure(streaming_threshold=0)

    def _is_xray_provider(self):
        return "aws_xray_sdk" in self.provider.__module__

    def ignore_endpoint(self, hostname: Optional[str] = None, urls: Optional[List[str]] = None):
        """If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being
        requested.

        > NOTE: If the provider is not xray, nothing will be added to ignore list

        Documentation
        --------------
        - https://github.com/aws/aws-xray-sdk-python#ignoring-httplib-requests

        Parameters
        ----------
        hostname : Optional, str
            The hostname is matched using the Python fnmatch library which does Unix glob style matching.
        urls: Optional, List[str]
            List of urls to ignore. Example `tracer.ignore_endpoint(urls=["/ignored-url"])`
        """
        if not self._is_xray_provider():
            return

        from aws_xray_sdk.ext.httplib import add_ignored  # type: ignore

        add_ignored(hostname=hostname, urls=urls)

Methods

def capture_lambda_handler(self, lambda_handler: Union[Callable[[Dict, Any], Any], Callable[[Dict, Any, Optional[Dict]], Any], ForwardRef(None)] = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None)

Decorator to create subsegment for lambda handlers

As Lambda follows (event, context) signature we can remove some of the boilerplate and also capture any exception any Lambda function throws or its response as metadata

Parameters

lambda_handler : Callable
Method to annotate on
capture_response : bool, optional
Instructs tracer to not include handler's response as metadata
capture_error : bool, optional
Instructs tracer to not include handler's error as metadata, by default True

Example

Lambda function using capture_lambda_handler decorator

tracer = Tracer(service="payment")
@tracer.capture_lambda_handler
def handler(event, context):
    ...

Preventing Tracer to log response as metadata

tracer = Tracer(service="payment")
@tracer.capture_lambda_handler(capture_response=False)
def handler(event, context):
    ...

Raises

err
Exception raised by method
Expand source code
def capture_lambda_handler(
    self,
    lambda_handler: Union[Callable[[Dict, Any], Any], Optional[Callable[[Dict, Any, Optional[Dict]], Any]]] = None,
    capture_response: Optional[bool] = None,
    capture_error: Optional[bool] = None,
):
    """Decorator to create subsegment for lambda handlers

    As Lambda follows (event, context) signature we can remove some of the boilerplate
    and also capture any exception any Lambda function throws or its response as metadata

    Parameters
    ----------
    lambda_handler : Callable
        Method to annotate on
    capture_response : bool, optional
        Instructs tracer to not include handler's response as metadata
    capture_error : bool, optional
        Instructs tracer to not include handler's error as metadata, by default True

    Example
    -------
    **Lambda function using capture_lambda_handler decorator**

        tracer = Tracer(service="payment")
        @tracer.capture_lambda_handler
        def handler(event, context):
            ...

    **Preventing Tracer to log response as metadata**

        tracer = Tracer(service="payment")
        @tracer.capture_lambda_handler(capture_response=False)
        def handler(event, context):
            ...

    Raises
    ------
    err
        Exception raised by method
    """
    # If handler is None we've been called with parameters
    # Return a partial function with args filled
    if lambda_handler is None:
        logger.debug("Decorator called with parameters")
        return functools.partial(
            self.capture_lambda_handler,
            capture_response=capture_response,
            capture_error=capture_error,
        )

    lambda_handler_name = lambda_handler.__name__
    capture_response = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"),
        choice=capture_response,
    )
    capture_error = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"),
        choice=capture_error,
    )

    @functools.wraps(lambda_handler)
    def decorate(event, context, **kwargs):
        with self.provider.in_subsegment(name=f"## {lambda_handler_name}") as subsegment:
            try:
                logger.debug("Calling lambda handler")
                response = lambda_handler(event, context, **kwargs)
                logger.debug("Received lambda handler response successfully")
                self._add_response_as_metadata(
                    method_name=lambda_handler_name,
                    data=response,
                    subsegment=subsegment,
                    capture_response=capture_response,
                )
            except Exception as err:
                logger.exception(f"Exception received from {lambda_handler_name}")
                self._add_full_exception_as_metadata(
                    method_name=lambda_handler_name,
                    error=err,
                    subsegment=subsegment,
                    capture_error=capture_error,
                )

                raise
            finally:
                global is_cold_start
                logger.debug("Annotating cold start")
                subsegment.put_annotation(key="ColdStart", value=is_cold_start)

                if is_cold_start:
                    is_cold_start = False

                if self.service:
                    subsegment.put_annotation(key="Service", value=self.service)

            return response

    return decorate
def capture_method(self, method: Optional[~AnyCallableT] = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None) ‑> ~AnyCallableT

Decorator to create subsegment for arbitrary functions

It also captures both response and exceptions as metadata and creates a subsegment named ## <method_module.method_qualifiedname>

see here: Qualified name for classes and functions

When running async functions concurrently, methods may impact each others subsegment, and can trigger and AlreadyEndedException from X-Ray due to async nature.

For this use case, either use capture_method only where async.gather is called, or use in_subsegment_async context manager via our escape hatch mechanism - See examples.

Parameters

method : Callable
Method to annotate on
capture_response : bool, optional
Instructs tracer to not include method's response as metadata
capture_error : bool, optional
Instructs tracer to not include handler's error as metadata, by default True

Example

Custom function using capture_method decorator

tracer = Tracer(service="payment")
@tracer.capture_method
def some_function()

Custom async method using capture_method decorator

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
async def confirm_booking(booking_id: str) -> Dict:
    resp = call_to_booking_service()

    tracer.put_annotation("BookingConfirmation", resp["requestId"])
    tracer.put_metadata("Booking confirmation", resp)

    return resp

def lambda_handler(event: dict, context: Any) -> Dict:
    booking_id = event.get("booking_id")
    asyncio.run(confirm_booking(booking_id=booking_id))

Custom generator function using capture_method decorator

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
def bookings_generator(booking_id):
    resp = call_to_booking_service()
    yield resp[0]
    yield resp[1]

def lambda_handler(event: dict, context: Any) -> Dict:
    gen = bookings_generator(booking_id=booking_id)
    result = list(gen)

Custom generator context manager using capture_method decorator

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
@contextlib.contextmanager
def booking_actions(booking_id):
    resp = call_to_booking_service()
    yield "example result"
    cleanup_stuff()

def lambda_handler(event: dict, context: Any) -> Dict:
    booking_id = event.get("booking_id")

    with booking_actions(booking_id=booking_id) as booking:
        result = booking

Tracing nested async calls

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

@tracer.capture_method
async def get_identity():
    ...

@tracer.capture_method
async def long_async_call():
    ...

@tracer.capture_method
async def async_tasks():
    await get_identity()
    ret = await long_async_call()

    return { "task": "done", **ret }

Safely tracing concurrent async calls with decorator

This may not needed once this bug is closed

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

async def get_identity():
    async with aioboto3.client("sts") as sts:
        account = await sts.get_caller_identity()
        return account

async def long_async_call():
    ...

@tracer.capture_method
async def async_tasks():
    _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

    return { "task": "done", **ret }

Safely tracing each concurrent async calls with escape hatch

This may not needed once this bug is closed

from aws_lambda_powertools import Tracer
tracer = Tracer(service="booking")

async def get_identity():
    async tracer.provider.in_subsegment_async("## get_identity"):
        ...

async def long_async_call():
    async tracer.provider.in_subsegment_async("## long_async_call"):
        ...

@tracer.capture_method
async def async_tasks():
    _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

    return { "task": "done", **ret }

Raises

err
Exception raised by method
Expand source code
def capture_method(
    self,
    method: Optional[AnyCallableT] = None,
    capture_response: Optional[bool] = None,
    capture_error: Optional[bool] = None,
) -> AnyCallableT:
    """Decorator to create subsegment for arbitrary functions

    It also captures both response and exceptions as metadata
    and creates a subsegment named `## <method_module.method_qualifiedname>`
    # see here: [Qualified name for classes and functions](https://peps.python.org/pep-3155/)

    When running [async functions concurrently](https://docs.python.org/3/library/asyncio-task.html#id6),
    methods may impact each others subsegment, and can trigger
    and AlreadyEndedException from X-Ray due to async nature.

    For this use case, either use `capture_method` only where
    `async.gather` is called, or use `in_subsegment_async`
    context manager via our escape hatch mechanism - See examples.

    Parameters
    ----------
    method : Callable
        Method to annotate on
    capture_response : bool, optional
        Instructs tracer to not include method's response as metadata
    capture_error : bool, optional
        Instructs tracer to not include handler's error as metadata, by default True

    Example
    -------
    **Custom function using capture_method decorator**

        tracer = Tracer(service="payment")
        @tracer.capture_method
        def some_function()

    **Custom async method using capture_method decorator**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        async def confirm_booking(booking_id: str) -> Dict:
            resp = call_to_booking_service()

            tracer.put_annotation("BookingConfirmation", resp["requestId"])
            tracer.put_metadata("Booking confirmation", resp)

            return resp

        def lambda_handler(event: dict, context: Any) -> Dict:
            booking_id = event.get("booking_id")
            asyncio.run(confirm_booking(booking_id=booking_id))

    **Custom generator function using capture_method decorator**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        def bookings_generator(booking_id):
            resp = call_to_booking_service()
            yield resp[0]
            yield resp[1]

        def lambda_handler(event: dict, context: Any) -> Dict:
            gen = bookings_generator(booking_id=booking_id)
            result = list(gen)

    **Custom generator context manager using capture_method decorator**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        @contextlib.contextmanager
        def booking_actions(booking_id):
            resp = call_to_booking_service()
            yield "example result"
            cleanup_stuff()

        def lambda_handler(event: dict, context: Any) -> Dict:
            booking_id = event.get("booking_id")

            with booking_actions(booking_id=booking_id) as booking:
                result = booking

    **Tracing nested async calls**

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        @tracer.capture_method
        async def get_identity():
            ...

        @tracer.capture_method
        async def long_async_call():
            ...

        @tracer.capture_method
        async def async_tasks():
            await get_identity()
            ret = await long_async_call()

            return { "task": "done", **ret }

    **Safely tracing concurrent async calls with decorator**

    This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164)

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        async def get_identity():
            async with aioboto3.client("sts") as sts:
                account = await sts.get_caller_identity()
                return account

        async def long_async_call():
            ...

        @tracer.capture_method
        async def async_tasks():
            _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

            return { "task": "done", **ret }

    **Safely tracing each concurrent async calls with escape hatch**

    This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164)

        from aws_lambda_powertools import Tracer
        tracer = Tracer(service="booking")

        async def get_identity():
            async tracer.provider.in_subsegment_async("## get_identity"):
                ...

        async def long_async_call():
            async tracer.provider.in_subsegment_async("## long_async_call"):
                ...

        @tracer.capture_method
        async def async_tasks():
            _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True)

            return { "task": "done", **ret }

    Raises
    ------
    err
        Exception raised by method
    """
    # If method is None we've been called with parameters
    # Return a partial function with args filled
    if method is None:
        logger.debug("Decorator called with parameters")
        return cast(
            AnyCallableT,
            functools.partial(self.capture_method, capture_response=capture_response, capture_error=capture_error),
        )

    # Example: app.ClassA.get_all  # noqa ERA001
    # Valid characters can be found at http://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html
    method_name = sanitize_xray_segment_name(f"{method.__module__}.{method.__qualname__}")

    capture_response = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"),
        choice=capture_response,
    )
    capture_error = resolve_truthy_env_var_choice(
        env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"),
        choice=capture_error,
    )

    # Maintenance: Need a factory/builder here to simplify this now
    if inspect.iscoroutinefunction(method):
        return self._decorate_async_function(
            method=method,
            capture_response=capture_response,
            capture_error=capture_error,
            method_name=method_name,
        )
    elif inspect.isgeneratorfunction(method):
        return self._decorate_generator_function(
            method=method,
            capture_response=capture_response,
            capture_error=capture_error,
            method_name=method_name,
        )
    elif hasattr(method, "__wrapped__") and inspect.isgeneratorfunction(method.__wrapped__):
        return self._decorate_generator_function_with_context_manager(
            method=method,
            capture_response=capture_response,
            capture_error=capture_error,
            method_name=method_name,
        )
    else:
        return self._decorate_sync_function(
            method=method,
            capture_response=capture_response,
            capture_error=capture_error,
            method_name=method_name,
        )
def ignore_endpoint(self, hostname: Optional[str] = None, urls: Optional[List[str]] = None)

If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being requested.

NOTE: If the provider is not xray, nothing will be added to ignore list

Documentation

Parameters

hostname : Optional, str
The hostname is matched using the Python fnmatch library which does Unix glob style matching.
urls : Optional, List[str]
List of urls to ignore. Example tracer.ignore_endpoint(urls=["/ignored-url"])
Expand source code
def ignore_endpoint(self, hostname: Optional[str] = None, urls: Optional[List[str]] = None):
    """If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being
    requested.

    > NOTE: If the provider is not xray, nothing will be added to ignore list

    Documentation
    --------------
    - https://github.com/aws/aws-xray-sdk-python#ignoring-httplib-requests

    Parameters
    ----------
    hostname : Optional, str
        The hostname is matched using the Python fnmatch library which does Unix glob style matching.
    urls: Optional, List[str]
        List of urls to ignore. Example `tracer.ignore_endpoint(urls=["/ignored-url"])`
    """
    if not self._is_xray_provider():
        return

    from aws_xray_sdk.ext.httplib import add_ignored  # type: ignore

    add_ignored(hostname=hostname, urls=urls)
def patch(self, modules: Optional[Sequence[str]] = None)

Patch modules for instrumentation.

Patches all supported modules by default if none are given.

Parameters

modules : Optional[Sequence[str]]
List of modules to be patched, optional by default
Expand source code
def patch(self, modules: Optional[Sequence[str]] = None):
    """Patch modules for instrumentation.

    Patches all supported modules by default if none are given.

    Parameters
    ----------
    modules : Optional[Sequence[str]]
        List of modules to be patched, optional by default
    """
    if self.disabled:
        logger.debug("Tracing has been disabled, aborting patch")
        return

    if modules is None:
        self.provider.patch_all()
    else:
        self.provider.patch(modules)
def put_annotation(self, key: str, value: Union[str, numbers.Number, bool])

Adds annotation to existing segment or subsegment

Parameters

key : str
Annotation key
value : Union[str, numbers.Number, bool]
Value for annotation

Example

Custom annotation for a pseudo service named payment

tracer = Tracer(service="payment")
tracer.put_annotation("PaymentStatus", "CONFIRMED")
Expand source code
def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]):
    """Adds annotation to existing segment or subsegment

    Parameters
    ----------
    key : str
        Annotation key
    value : Union[str, numbers.Number, bool]
        Value for annotation

    Example
    -------
    Custom annotation for a pseudo service named payment

        tracer = Tracer(service="payment")
        tracer.put_annotation("PaymentStatus", "CONFIRMED")
    """
    if self.disabled:
        logger.debug("Tracing has been disabled, aborting put_annotation")
        return

    logger.debug(f"Annotating on key '{key}' with '{value}'")
    self.provider.put_annotation(key=key, value=value)
def put_metadata(self, key: str, value: Any, namespace: Optional[str] = None)

Adds metadata to existing segment or subsegment

Parameters

key : str
Metadata key
value : any
Value for metadata
namespace : str, optional
Namespace that metadata will lie under, by default None

Example

Custom metadata for a pseudo service named payment

tracer = Tracer(service="payment")
response = collect_payment()
tracer.put_metadata("Payment collection", response)
Expand source code
def put_metadata(self, key: str, value: Any, namespace: Optional[str] = None):
    """Adds metadata to existing segment or subsegment

    Parameters
    ----------
    key : str
        Metadata key
    value : any
        Value for metadata
    namespace : str, optional
        Namespace that metadata will lie under, by default None

    Example
    -------
    Custom metadata for a pseudo service named payment

        tracer = Tracer(service="payment")
        response = collect_payment()
        tracer.put_metadata("Payment collection", response)
    """
    if self.disabled:
        logger.debug("Tracing has been disabled, aborting put_metadata")
        return

    namespace = namespace or self.service
    logger.debug(f"Adding metadata on key '{key}' with '{value}' at namespace '{namespace}'")
    self.provider.put_metadata(key=key, value=value, namespace=namespace)