Package aws_lambda_powertools
Top-level package for Lambda Python Powertools.
Expand source code
# -*- coding: utf-8 -*-
"""Top-level package for Lambda Python Powertools."""
from pathlib import Path
from .logging import Logger
from .metrics import Metrics, single_metric
from .package_logger import set_package_logger_handler
from .tracing import Tracer
__author__ = """Amazon Web Services"""
__all__ = [
"Logger",
"Metrics",
"single_metric",
"Tracer",
]
PACKAGE_PATH = Path(__file__).parent
set_package_logger_handler()
Sub-modules
aws_lambda_powertools.event_handler
-
Event handler decorators for common Lambda events
aws_lambda_powertools.exceptions
-
Shared exceptions that don't belong to a single utility
aws_lambda_powertools.logging
-
Logging utility
aws_lambda_powertools.metrics
-
CloudWatch Embedded Metric Format utility
aws_lambda_powertools.middleware_factory
-
Utilities to enhance middlewares
aws_lambda_powertools.package_logger
aws_lambda_powertools.shared
aws_lambda_powertools.tracing
-
Tracing utility
aws_lambda_powertools.utilities
-
General utilities for Powertools
Functions
def single_metric(name: str, unit: MetricUnit, value: float, namespace: Optional[str] = None) ‑> Generator[SingleMetric, None, None]
-
Context manager to simplify creation of a single metric
Example
Creates cold start metric with function_version as dimension
from aws_lambda_powertools import single_metric from aws_lambda_powertools.metrics import MetricUnit with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, namespace="ServerlessAirline") as metric: metric.add_dimension(name="function_version", value="47")
Same as above but set namespace using environment variable
$ export POWERTOOLS_METRICS_NAMESPACE="ServerlessAirline" from aws_lambda_powertools import single_metric from aws_lambda_powertools.metrics import MetricUnit with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1) as metric: metric.add_dimension(name="function_version", value="47")
Parameters
name
:str
- Metric name
unit
:MetricUnit
aws_lambda_powertools.helper.models.MetricUnit
value
:float
- Metric value
namespace
:str
- Namespace for metrics
Yields
SingleMetric
- SingleMetric class instance
Raises
MetricUnitError
- When metric metric isn't supported by CloudWatch
MetricValueError
- When metric value isn't a number
SchemaValidationError
- When metric object fails EMF schema validation
Expand source code
@contextmanager def single_metric( name: str, unit: MetricUnit, value: float, namespace: Optional[str] = None ) -> Generator[SingleMetric, None, None]: """Context manager to simplify creation of a single metric Example ------- **Creates cold start metric with function_version as dimension** from aws_lambda_powertools import single_metric from aws_lambda_powertools.metrics import MetricUnit with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, namespace="ServerlessAirline") as metric: metric.add_dimension(name="function_version", value="47") **Same as above but set namespace using environment variable** $ export POWERTOOLS_METRICS_NAMESPACE="ServerlessAirline" from aws_lambda_powertools import single_metric from aws_lambda_powertools.metrics import MetricUnit with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1) as metric: metric.add_dimension(name="function_version", value="47") Parameters ---------- name : str Metric name unit : MetricUnit `aws_lambda_powertools.helper.models.MetricUnit` value : float Metric value namespace: str Namespace for metrics Yields ------- SingleMetric SingleMetric class instance Raises ------ MetricUnitError When metric metric isn't supported by CloudWatch MetricValueError When metric value isn't a number SchemaValidationError When metric object fails EMF schema validation """ metric_set: Optional[Dict] = None try: metric: SingleMetric = SingleMetric(namespace=namespace) metric.add_metric(name=name, unit=unit, value=value) yield metric metric_set = metric.serialize_metric_set() finally: print(json.dumps(metric_set, separators=(",", ":")))
Classes
class Logger (service: Optional[str] = None, level: Union[str, int, None] = None, child: bool = False, sampling_rate: Optional[float] = None, stream: Optional[IO[str]] = None, logger_formatter: Optional[PowertoolsFormatter] = None, logger_handler: Optional[logging.Handler] = None, log_uncaught_exceptions: bool = False, json_serializer: Optional[Callable[[Dict], str]] = None, json_deserializer: Optional[Callable[[Union[Dict, str, bool, int, float]], str]] = None, json_default: Optional[Callable[[Any], Any]] = None, datefmt: Optional[str] = None, use_datetime_directive: bool = False, log_record_order: Optional[List[str]] = None, utc: bool = False, use_rfc3339: bool = False, **kwargs)
-
Creates and setups a logger to format statements in JSON.
Includes service name and any additional key=value into logs It also accepts both service name or level explicitly via env vars
Environment Variables
POWERTOOLS_SERVICE_NAME : str service name LOG_LEVEL: str logging level (e.g. INFO, DEBUG) POWERTOOLS_LOGGER_SAMPLE_RATE: float sampling rate ranging from 0 to 1, 1 being 100% sampling
Parameters
service
:str
, optional- service name to be appended in logs, by default "service_undefined"
level
:str, int optional
- logging.level, by default "INFO"
child
:bool
, optional- create a child Logger named
. , False by default sample_rate
:float
, optional- sample rate for debug calls within execution context defaults to 0.0
stream
:sys.stdout
, optional- valid output for a logging stream, by default sys.stdout
logger_formatter
:PowertoolsFormatter
, optional- custom logging formatter that implements PowertoolsFormatter
logger_handler
:logging.Handler
, optional- custom logging handler e.g. logging.FileHandler("file.log")
log_uncaught_exceptions
:bool, by default False
-
logs uncaught exception using sys.excepthook
See: https://docs.python.org/3/library/sys.html#sys.excepthook
Parameters Propagated To Lambdapowertoolsformatter
datefmt: str, optional String directives (strftime) to format log timestamp using
time
, by default it uses 2021-05-03 11:47:12,494+0200. # noqa: E501 use_datetime_directive: bool, optional Interpretdatefmt
as a format string fordatetime.datetime.strftime
, rather thantime.strftime
.See <https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior> . This also supports a custom %F directive for milliseconds.
use_rfc3339: bool, optional Whether to use a popular date format that complies with both RFC3339 and ISO8601. e.g., 2022-10-27T16:27:43.738+02:00. json_serializer : Callable, optional function to serialize
obj
to a JSON formattedstr
, by default json.dumps json_deserializer : Callable, optional function to deserializestr
,bytes
, bytearraycontaining a JSON document to a Python
obj`, by default json.loads json_default : Callable, optional function to coerce unserializable values, by defaultstr()
Only used when no custom formatter is set
utc : bool, optional set logging timestamp to UTC, by default False to continue to use local time as per stdlib log_record_order : list, optional set order of log keys when logging, by default ["level", "location", "message", "timestamp"]
Example
Setups structured logging in JSON for Lambda functions with explicit service name
>>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> def handler(event, context): logger.info("Hello")
Setups structured logging in JSON for Lambda functions using env vars
$ export POWERTOOLS_SERVICE_NAME="payment" $ export POWERTOOLS_LOGGER_SAMPLE_RATE=0.01 # 1% debug sampling >>> from aws_lambda_powertools import Logger >>> logger = Logger() >>> >>> def handler(event, context): logger.info("Hello")
Append payment_id to previously setup logger
>>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> def handler(event, context): logger.append_keys(payment_id=event["payment_id"]) logger.info("Hello")
Create child Logger using logging inheritance via child param
>>> # app.py >>> import another_file >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> # another_file.py >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment", child=True)
Logging in UTC timezone
>>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", utc=True)
Brings message as the first key in log statements
>>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", log_record_order=["message"])
Logging to a file instead of standard output for testing
>>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", logger_handler=logging.FileHandler("log.json"))
Raises
InvalidLoggerSamplingRateError
- When sampling rate provided is not a float
Initialize the logger with a name and an optional level.
Expand source code
class Logger(logging.Logger): # lgtm [py/missing-call-to-init] """Creates and setups a logger to format statements in JSON. Includes service name and any additional key=value into logs It also accepts both service name or level explicitly via env vars Environment variables --------------------- POWERTOOLS_SERVICE_NAME : str service name LOG_LEVEL: str logging level (e.g. INFO, DEBUG) POWERTOOLS_LOGGER_SAMPLE_RATE: float sampling rate ranging from 0 to 1, 1 being 100% sampling Parameters ---------- service : str, optional service name to be appended in logs, by default "service_undefined" level : str, int optional logging.level, by default "INFO" child: bool, optional create a child Logger named <service>.<caller_file_name>, False by default sample_rate: float, optional sample rate for debug calls within execution context defaults to 0.0 stream: sys.stdout, optional valid output for a logging stream, by default sys.stdout logger_formatter: PowertoolsFormatter, optional custom logging formatter that implements PowertoolsFormatter logger_handler: logging.Handler, optional custom logging handler e.g. logging.FileHandler("file.log") log_uncaught_exceptions: bool, by default False logs uncaught exception using sys.excepthook See: https://docs.python.org/3/library/sys.html#sys.excepthook Parameters propagated to LambdaPowertoolsFormatter -------------------------------------------------- datefmt: str, optional String directives (strftime) to format log timestamp using `time`, by default it uses 2021-05-03 11:47:12,494+0200. # noqa: E501 use_datetime_directive: bool, optional Interpret `datefmt` as a format string for `datetime.datetime.strftime`, rather than `time.strftime`. See https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior . This also supports a custom %F directive for milliseconds. use_rfc3339: bool, optional Whether to use a popular date format that complies with both RFC3339 and ISO8601. e.g., 2022-10-27T16:27:43.738+02:00. json_serializer : Callable, optional function to serialize `obj` to a JSON formatted `str`, by default json.dumps json_deserializer : Callable, optional function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`, by default json.loads json_default : Callable, optional function to coerce unserializable values, by default `str()` Only used when no custom formatter is set utc : bool, optional set logging timestamp to UTC, by default False to continue to use local time as per stdlib log_record_order : list, optional set order of log keys when logging, by default ["level", "location", "message", "timestamp"] Example ------- **Setups structured logging in JSON for Lambda functions with explicit service name** >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> def handler(event, context): logger.info("Hello") **Setups structured logging in JSON for Lambda functions using env vars** $ export POWERTOOLS_SERVICE_NAME="payment" $ export POWERTOOLS_LOGGER_SAMPLE_RATE=0.01 # 1% debug sampling >>> from aws_lambda_powertools import Logger >>> logger = Logger() >>> >>> def handler(event, context): logger.info("Hello") **Append payment_id to previously setup logger** >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> def handler(event, context): logger.append_keys(payment_id=event["payment_id"]) logger.info("Hello") **Create child Logger using logging inheritance via child param** >>> # app.py >>> import another_file >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> # another_file.py >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment", child=True) **Logging in UTC timezone** >>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", utc=True) **Brings message as the first key in log statements** >>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", log_record_order=["message"]) **Logging to a file instead of standard output for testing** >>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", logger_handler=logging.FileHandler("log.json")) Raises ------ InvalidLoggerSamplingRateError When sampling rate provided is not a float """ def __init__( self, service: Optional[str] = None, level: Union[str, int, None] = None, child: bool = False, sampling_rate: Optional[float] = None, stream: Optional[IO[str]] = None, logger_formatter: Optional[PowertoolsFormatter] = None, logger_handler: Optional[logging.Handler] = None, log_uncaught_exceptions: bool = False, json_serializer: Optional[Callable[[Dict], str]] = None, json_deserializer: Optional[Callable[[Union[Dict, str, bool, int, float]], str]] = None, json_default: Optional[Callable[[Any], Any]] = None, datefmt: Optional[str] = None, use_datetime_directive: bool = False, log_record_order: Optional[List[str]] = None, utc: bool = False, use_rfc3339: bool = False, **kwargs, ): self.service = resolve_env_var_choice( choice=service, env=os.getenv(constants.SERVICE_NAME_ENV, "service_undefined") ) self.sampling_rate = resolve_env_var_choice( choice=sampling_rate, env=os.getenv(constants.LOGGER_LOG_SAMPLING_RATE) ) self.child = child self.logger_formatter = logger_formatter self.logger_handler = logger_handler or logging.StreamHandler(stream) self.log_uncaught_exceptions = log_uncaught_exceptions self.log_level = self._get_log_level(level) self._is_deduplication_disabled = resolve_truthy_env_var_choice( env=os.getenv(constants.LOGGER_LOG_DEDUPLICATION_ENV, "false") ) self._default_log_keys = {"service": self.service, "sampling_rate": self.sampling_rate} self._logger = self._get_logger() # NOTE: This is primarily to improve UX, so IDEs can autocomplete LambdaPowertoolsFormatter options # previously, we masked all of them as kwargs thus limiting feature discovery formatter_options = { "json_serializer": json_serializer, "json_deserializer": json_deserializer, "json_default": json_default, "datefmt": datefmt, "use_datetime_directive": use_datetime_directive, "log_record_order": log_record_order, "utc": utc, "use_rfc3339": use_rfc3339, } self._init_logger(formatter_options=formatter_options, **kwargs) if self.log_uncaught_exceptions: logger.debug("Replacing exception hook") sys.excepthook = functools.partial(log_uncaught_exception_hook, logger=self) # Prevent __getattr__ from shielding unknown attribute errors in type checkers # https://github.com/awslabs/aws-lambda-powertools-python/issues/1660 if not TYPE_CHECKING: def __getattr__(self, name): # Proxy attributes not found to actual logger to support backward compatibility # https://github.com/awslabs/aws-lambda-powertools-python/issues/97 return getattr(self._logger, name) def _get_logger(self): """Returns a Logger named {self.service}, or {self.service.filename} for child loggers""" logger_name = self.service if self.child: logger_name = f"{self.service}.{self._get_caller_filename()}" return logging.getLogger(logger_name) def _init_logger(self, formatter_options: Optional[Dict] = None, **kwargs): """Configures new logger""" # Skip configuration if it's a child logger or a pre-configured logger # to prevent the following: # a) multiple handlers being attached # b) different sampling mechanisms # c) multiple messages from being logged as handlers can be duplicated is_logger_preconfigured = getattr(self._logger, "init", False) if self.child or is_logger_preconfigured: return self._configure_sampling() self._logger.setLevel(self.log_level) self._logger.addHandler(self.logger_handler) self.structure_logs(formatter_options=formatter_options, **kwargs) # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work self._logger.findCaller = self.findCaller # Pytest Live Log feature duplicates log records for colored output # but we explicitly add a filter for log deduplication. # This flag disables this protection when you explicit want logs to be duplicated (#262) if not self._is_deduplication_disabled: logger.debug("Adding filter in root logger to suppress child logger records to bubble up") for handler in logging.root.handlers: # It'll add a filter to suppress any child logger from self.service # Example: `Logger(service="order")`, where service is Order # It'll reject all loggers starting with `order` e.g. order.checkout, order.shared handler.addFilter(SuppressFilter(self.service)) # as per bug in #249, we should not be pre-configuring an existing logger # therefore we set a custom attribute in the Logger that will be returned # std logging will return the same Logger with our attribute if name is reused logger.debug(f"Marking logger {self.service} as preconfigured") self._logger.init = True def _configure_sampling(self): """Dynamically set log level based on sampling rate Raises ------ InvalidLoggerSamplingRateError When sampling rate provided is not a float """ try: if self.sampling_rate and random.random() <= float(self.sampling_rate): logger.debug("Setting log level to Debug due to sampling rate") self.log_level = logging.DEBUG except ValueError: raise InvalidLoggerSamplingRateError( f"Expected a float value ranging 0 to 1, but received {self.sampling_rate} instead." f"Please review POWERTOOLS_LOGGER_SAMPLE_RATE environment variable." ) @overload def inject_lambda_context( self, lambda_handler: AnyCallableT, log_event: Optional[bool] = None, correlation_id_path: Optional[str] = None, clear_state: Optional[bool] = False, ) -> AnyCallableT: ... @overload def inject_lambda_context( self, lambda_handler: None = None, log_event: Optional[bool] = None, correlation_id_path: Optional[str] = None, clear_state: Optional[bool] = False, ) -> Callable[[AnyCallableT], AnyCallableT]: ... def inject_lambda_context( self, lambda_handler: Optional[AnyCallableT] = None, log_event: Optional[bool] = None, correlation_id_path: Optional[str] = None, clear_state: Optional[bool] = False, ) -> Any: """Decorator to capture Lambda contextual info and inject into logger Parameters ---------- clear_state : bool, optional Instructs logger to remove any custom keys previously added lambda_handler : Callable Method to inject the lambda context log_event : bool, optional Instructs logger to log Lambda Event, by default False correlation_id_path: str, optional Optional JMESPath for the correlation_id Environment variables --------------------- POWERTOOLS_LOGGER_LOG_EVENT : str instruct logger to log Lambda Event (e.g. `"true", "True", "TRUE"`) Example ------- **Captures Lambda contextual runtime info (e.g memory, arn, req_id)** from aws_lambda_powertools import Logger logger = Logger(service="payment") @logger.inject_lambda_context def handler(event, context): logger.info("Hello") **Captures Lambda contextual runtime info and logs incoming request** from aws_lambda_powertools import Logger logger = Logger(service="payment") @logger.inject_lambda_context(log_event=True) def handler(event, context): logger.info("Hello") Returns ------- decorate : Callable Decorated lambda handler """ # If handler is None we've been called with parameters # Return a partial function with args filled if lambda_handler is None: logger.debug("Decorator called with parameters") return functools.partial( self.inject_lambda_context, log_event=log_event, correlation_id_path=correlation_id_path, clear_state=clear_state, ) log_event = resolve_truthy_env_var_choice( env=os.getenv(constants.LOGGER_LOG_EVENT_ENV, "false"), choice=log_event ) @functools.wraps(lambda_handler) def decorate(event, context, *args, **kwargs): lambda_context = build_lambda_context_model(context) cold_start = _is_cold_start() if clear_state: self.structure_logs(cold_start=cold_start, **lambda_context.__dict__) else: self.append_keys(cold_start=cold_start, **lambda_context.__dict__) if correlation_id_path: self.set_correlation_id(jmespath.search(correlation_id_path, event)) if log_event: logger.debug("Event received") self.info(getattr(event, "raw_event", event)) return lambda_handler(event, context, *args, **kwargs) return decorate def info( self, msg: object, *args, exc_info=None, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs, ): extra = extra or {} extra = {**extra, **kwargs} # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work if sys.version_info < (3, 8): # pragma: no cover return self._logger.info(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra) return self._logger.info( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra ) def error( self, msg: object, *args, exc_info=None, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs, ): extra = extra or {} extra = {**extra, **kwargs} # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work if sys.version_info < (3, 8): # pragma: no cover return self._logger.error(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra) return self._logger.error( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra ) def exception( self, msg: object, *args, exc_info=True, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs, ): extra = extra or {} extra = {**extra, **kwargs} # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work if sys.version_info < (3, 8): # pragma: no cover return self._logger.exception(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra) return self._logger.exception( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra ) def critical( self, msg: object, *args, exc_info=None, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs, ): extra = extra or {} extra = {**extra, **kwargs} # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work if sys.version_info < (3, 8): # pragma: no cover return self._logger.critical(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra) return self._logger.critical( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra ) def warning( self, msg: object, *args, exc_info=None, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs, ): extra = extra or {} extra = {**extra, **kwargs} # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work if sys.version_info < (3, 8): # pragma: no cover return self._logger.warning(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra) return self._logger.warning( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra ) def debug( self, msg: object, *args, exc_info=None, stack_info: bool = False, stacklevel: int = 2, extra: Optional[Mapping[str, object]] = None, **kwargs, ): extra = extra or {} extra = {**extra, **kwargs} # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work if sys.version_info < (3, 8): # pragma: no cover return self._logger.debug(msg, *args, exc_info=exc_info, stack_info=stack_info, extra=extra) return self._logger.debug( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra ) def append_keys(self, **additional_keys): self.registered_formatter.append_keys(**additional_keys) def remove_keys(self, keys: Iterable[str]): self.registered_formatter.remove_keys(keys) @property def registered_handler(self) -> logging.Handler: """Convenience property to access logger handler""" handlers = self._logger.parent.handlers if self.child else self._logger.handlers return handlers[0] @property def registered_formatter(self) -> BasePowertoolsFormatter: """Convenience property to access logger formatter""" return self.registered_handler.formatter # type: ignore def structure_logs(self, append: bool = False, formatter_options: Optional[Dict] = None, **keys): """Sets logging formatting to JSON. Optionally, it can append keyword arguments to an existing logger, so it is available across future log statements. Last keyword argument and value wins if duplicated. Parameters ---------- append : bool, optional append keys provided to logger formatter, by default False formatter_options : dict, optional LambdaPowertoolsFormatter options to be propagated, by default {} """ formatter_options = formatter_options or {} # There are 3 operational modes for this method ## 1. Register a Powertools Formatter for the first time ## 2. Append new keys to the current logger formatter; deprecated in favour of append_keys ## 3. Add new keys and discard existing to the registered formatter # Mode 1 log_keys = {**self._default_log_keys, **keys} is_logger_preconfigured = getattr(self._logger, "init", False) if not is_logger_preconfigured: formatter = self.logger_formatter or LambdaPowertoolsFormatter(**formatter_options, **log_keys) # type: ignore # noqa: E501 self.registered_handler.setFormatter(formatter) # when using a custom Lambda Powertools Formatter # standard and custom keys that are not Powertools Formatter parameters should be appended # and custom keys that might happen to be Powertools Formatter parameters should be discarded # this prevents adding them as custom keys, for example, `json_default=<callable>` # see https://github.com/awslabs/aws-lambda-powertools-python/issues/1263 custom_keys = {k: v for k, v in log_keys.items() if k not in RESERVED_FORMATTER_CUSTOM_KEYS} return self.registered_formatter.append_keys(**custom_keys) # Mode 2 (legacy) if append: # Maintenance: Add deprecation warning for major version return self.append_keys(**keys) # Mode 3 self.registered_formatter.clear_state() self.registered_formatter.append_keys(**log_keys) def set_correlation_id(self, value: Optional[str]): """Sets the correlation_id in the logging json Parameters ---------- value : str, optional Value for the correlation id. None will remove the correlation_id """ self.append_keys(correlation_id=value) def get_correlation_id(self) -> Optional[str]: """Gets the correlation_id in the logging json Returns ------- str, optional Value for the correlation id """ if isinstance(self.registered_formatter, LambdaPowertoolsFormatter): return self.registered_formatter.log_format.get("correlation_id") return None @staticmethod def _get_log_level(level: Union[str, int, None]) -> Union[str, int]: """Returns preferred log level set by the customer in upper case""" if isinstance(level, int): return level log_level: Optional[str] = level or os.getenv("LOG_LEVEL") if log_level is None: return logging.INFO return log_level.upper() @staticmethod def _get_caller_filename(): """Return caller filename by finding the caller frame""" # Current frame => _get_logger() # Previous frame => logger.py # Before previous frame => Caller frame = inspect.currentframe() caller_frame = frame.f_back.f_back.f_back return caller_frame.f_globals["__name__"] # Maintenance: We can drop this upon Py3.7 EOL. It's a backport for "location" key to work def findCaller(self, stack_info=False, stacklevel=2): # pragma: no cover """ Find the stack frame of the caller so that we can note the source file name, line number and function name. """ f = logging.currentframe() # noqa: VNE001 # On some versions of IronPython, currentframe() returns None if # IronPython isn't run with -X:Frames. if f is None: return "(unknown file)", 0, "(unknown function)", None while stacklevel > 0: next_f = f.f_back if next_f is None: ## We've got options here. ## If we want to use the last (deepest) frame: break ## If we want to mimic the warnings module: # return ("sys", 1, "(unknown function)", None) # noqa: E800 ## If we want to be pedantic: # noqa: E800 # raise ValueError("call stack is not deep enough") # noqa: E800 f = next_f # noqa: VNE001 if not _is_internal_frame(f): stacklevel -= 1 co = f.f_code sinfo = None if stack_info: with io.StringIO() as sio: sio.write("Stack (most recent call last):\n") traceback.print_stack(f, file=sio) sinfo = sio.getvalue() if sinfo[-1] == "\n": sinfo = sinfo[:-1] return co.co_filename, f.f_lineno, co.co_name, sinfo
Ancestors
- Logger
- logging.Filterer
Methods
def append_keys(self, **additional_keys)
-
Expand source code
def append_keys(self, **additional_keys): self.registered_formatter.append_keys(**additional_keys)
def remove_keys(self, keys: Iterable[str])
-
Expand source code
def remove_keys(self, keys: Iterable[str]): self.registered_formatter.remove_keys(keys)
Inherited members
class Metrics (service: Optional[str] = None, namespace: Optional[str] = None)
-
Metrics create an EMF object with up to 100 metrics
Use Metrics when you need to create multiple metrics that have dimensions in common (e.g. service_name="payment").
Metrics up to 100 metrics in memory and are shared across all its instances. That means it can be safely instantiated outside of a Lambda function, or anywhere else.
A decorator (log_metrics) is provided so metrics are published at the end of its execution. If more than 100 metrics are added at a given function execution, these metrics are serialized and published before adding a given metric to prevent metric truncation.
Example
Creates a few metrics and publish at the end of a function execution
from aws_lambda_powertools import Metrics metrics = Metrics(namespace="ServerlessAirline", service="payment") @metrics.log_metrics(capture_cold_start_metric=True) def lambda_handler(): metrics.add_metric(name="BookingConfirmation", unit="Count", value=1) metrics.add_dimension(name="function_version", value="$LATEST") return True
Environment Variables
POWERTOOLS_METRICS_NAMESPACE : str metric namespace POWERTOOLS_SERVICE_NAME : str service name used for default dimension
Parameters
service
:str
, optional- service name to be used as metric dimension, by default "service_undefined"
namespace
:str
, optional- Namespace for metrics
Raises
MetricUnitError
- When metric metric isn't supported by CloudWatch
MetricValueError
- When metric value isn't a number
SchemaValidationError
- When metric object fails EMF schema validation
Expand source code
class Metrics(MetricManager): """Metrics create an EMF object with up to 100 metrics Use Metrics when you need to create multiple metrics that have dimensions in common (e.g. service_name="payment"). Metrics up to 100 metrics in memory and are shared across all its instances. That means it can be safely instantiated outside of a Lambda function, or anywhere else. A decorator (log_metrics) is provided so metrics are published at the end of its execution. If more than 100 metrics are added at a given function execution, these metrics are serialized and published before adding a given metric to prevent metric truncation. Example ------- **Creates a few metrics and publish at the end of a function execution** from aws_lambda_powertools import Metrics metrics = Metrics(namespace="ServerlessAirline", service="payment") @metrics.log_metrics(capture_cold_start_metric=True) def lambda_handler(): metrics.add_metric(name="BookingConfirmation", unit="Count", value=1) metrics.add_dimension(name="function_version", value="$LATEST") return True Environment variables --------------------- POWERTOOLS_METRICS_NAMESPACE : str metric namespace POWERTOOLS_SERVICE_NAME : str service name used for default dimension Parameters ---------- service : str, optional service name to be used as metric dimension, by default "service_undefined" namespace : str, optional Namespace for metrics Raises ------ MetricUnitError When metric metric isn't supported by CloudWatch MetricValueError When metric value isn't a number SchemaValidationError When metric object fails EMF schema validation """ # NOTE: We use class attrs to share metrics data across instances # this allows customers to initialize Metrics() throughout their code base (and middlewares) # and not get caught by accident with metrics data loss, or data deduplication # e.g., m1 and m2 add metric ProductCreated, however m1 has 'version' dimension but m2 doesn't # Result: ProductCreated is created twice as we now have 2 different EMF blobs _metrics: Dict[str, Any] = {} _dimensions: Dict[str, str] = {} _metadata: Dict[str, Any] = {} _default_dimensions: Dict[str, Any] = {} def __init__(self, service: Optional[str] = None, namespace: Optional[str] = None): self.metric_set = self._metrics self.metadata_set = self._metadata self.default_dimensions = self._default_dimensions self.dimension_set = self._dimensions self.dimension_set.update(**self._default_dimensions) return super().__init__( namespace=namespace, service=service, metric_set=self.metric_set, dimension_set=self.dimension_set, metadata_set=self.metadata_set, ) def set_default_dimensions(self, **dimensions) -> None: """Persist dimensions across Lambda invocations Parameters ---------- dimensions : Dict[str, Any], optional metric dimensions as key=value Example ------- **Sets some default dimensions that will always be present across metrics and invocations** from aws_lambda_powertools import Metrics metrics = Metrics(namespace="ServerlessAirline", service="payment") metrics.set_default_dimensions(environment="demo", another="one") @metrics.log_metrics() def lambda_handler(): return True """ for name, value in dimensions.items(): self.add_dimension(name, value) self.default_dimensions.update(**dimensions) def clear_default_dimensions(self) -> None: self.default_dimensions.clear() def clear_metrics(self) -> None: super().clear_metrics() # re-add default dimensions self.set_default_dimensions(**self.default_dimensions)
Ancestors
Methods
def clear_default_dimensions(self) ‑> None
-
Expand source code
def clear_default_dimensions(self) -> None: self.default_dimensions.clear()
def clear_metrics(self) ‑> None
-
Expand source code
def clear_metrics(self) -> None: super().clear_metrics() # re-add default dimensions self.set_default_dimensions(**self.default_dimensions)
def set_default_dimensions(self, **dimensions) ‑> None
-
Persist dimensions across Lambda invocations
Parameters
dimensions
:Dict[str, Any]
, optional- metric dimensions as key=value
Example
Sets some default dimensions that will always be present across metrics and invocations
from aws_lambda_powertools import Metrics metrics = Metrics(namespace="ServerlessAirline", service="payment") metrics.set_default_dimensions(environment="demo", another="one") @metrics.log_metrics() def lambda_handler(): return True
Expand source code
def set_default_dimensions(self, **dimensions) -> None: """Persist dimensions across Lambda invocations Parameters ---------- dimensions : Dict[str, Any], optional metric dimensions as key=value Example ------- **Sets some default dimensions that will always be present across metrics and invocations** from aws_lambda_powertools import Metrics metrics = Metrics(namespace="ServerlessAirline", service="payment") metrics.set_default_dimensions(environment="demo", another="one") @metrics.log_metrics() def lambda_handler(): return True """ for name, value in dimensions.items(): self.add_dimension(name, value) self.default_dimensions.update(**dimensions)
Inherited members
class Tracer (service: Optional[str] = None, disabled: Optional[bool] = None, auto_patch: Optional[bool] = None, patch_modules: Optional[Sequence[str]] = None, provider: Optional[BaseProvider] = None)
-
Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions
When running locally, it detects whether it's running via SAM CLI, and if it is it returns dummy segments/subsegments instead.
By default, it patches all available libraries supported by X-Ray SDK. Patching is automatically disabled when running locally via SAM CLI or by any other means.
Ref: https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/thirdparty.html
Tracer keeps a copy of its configuration as it can be instantiated more than once. This is useful when you are using your own middlewares and want to utilize an existing Tracer. Make sure to set
auto_patch=False
in subsequent Tracer instances to avoid double patching.Environment Variables
POWERTOOLS_TRACE_DISABLED : str disable tracer (e.g.
"true", "True", "TRUE"
) POWERTOOLS_SERVICE_NAME : str service name POWERTOOLS_TRACER_CAPTURE_RESPONSE : str disable auto-capture response as metadata (e.g."true", "True", "TRUE"
) POWERTOOLS_TRACER_CAPTURE_ERROR : str disable auto-capture error as metadata (e.g."true", "True", "TRUE"
)Parameters
service
:str
- Service name that will be appended in all tracing metadata
auto_patch
:bool
- Patch existing imported modules during initialization, by default True
disabled
:bool
- Flag to explicitly disable tracing, useful when running/testing locally
Env POWERTOOLS_TRACE_DISABLED="true"
patch_modules
:Optional[Sequence[str]]
- Tuple of modules supported by tracing provider to patch, by default all modules are patched
provider
:BaseProvider
- Tracing provider, by default it is aws_xray_sdk.core.xray_recorder
Returns
Tracer
- Tracer instance with imported modules patched
Example
A Lambda function using Tracer
from aws_lambda_powertools import Tracer tracer = Tracer(service="greeting") @tracer.capture_method def greeting(name: str) -> Dict: return { "name": name } @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> Dict: print("Received event from Lambda...") response = greeting(name="Heitor") return response
Booking Lambda function using Tracer that adds additional annotation/metadata
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method def confirm_booking(booking_id: str) -> Dict: resp = add_confirmation(booking_id) tracer.put_annotation("BookingConfirmation", resp["requestId"]) tracer.put_metadata("Booking confirmation", resp) return resp @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> Dict: print("Received event from Lambda...") booking_id = event.get("booking_id") response = confirm_booking(booking_id=booking_id) return response
A Lambda function using service name via POWERTOOLS_SERVICE_NAME
export POWERTOOLS_SERVICE_NAME="booking" from aws_lambda_powertools import Tracer tracer = Tracer() @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> Dict: print("Received event from Lambda...") response = greeting(name="Lessa") return response
Reuse an existing instance of Tracer anywhere in the code
# lambda_handler.py from aws_lambda_powertools import Tracer tracer = Tracer() @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> Dict: ... # utils.py from aws_lambda_powertools import Tracer tracer = Tracer() ...
Limitations
- Async handler not supported
Expand source code
class Tracer: """Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions When running locally, it detects whether it's running via SAM CLI, and if it is it returns dummy segments/subsegments instead. By default, it patches all available libraries supported by X-Ray SDK. Patching is automatically disabled when running locally via SAM CLI or by any other means. \n Ref: https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/thirdparty.html Tracer keeps a copy of its configuration as it can be instantiated more than once. This is useful when you are using your own middlewares and want to utilize an existing Tracer. Make sure to set `auto_patch=False` in subsequent Tracer instances to avoid double patching. Environment variables --------------------- POWERTOOLS_TRACE_DISABLED : str disable tracer (e.g. `"true", "True", "TRUE"`) POWERTOOLS_SERVICE_NAME : str service name POWERTOOLS_TRACER_CAPTURE_RESPONSE : str disable auto-capture response as metadata (e.g. `"true", "True", "TRUE"`) POWERTOOLS_TRACER_CAPTURE_ERROR : str disable auto-capture error as metadata (e.g. `"true", "True", "TRUE"`) Parameters ---------- service: str Service name that will be appended in all tracing metadata auto_patch: bool Patch existing imported modules during initialization, by default True disabled: bool Flag to explicitly disable tracing, useful when running/testing locally `Env POWERTOOLS_TRACE_DISABLED="true"` patch_modules: Optional[Sequence[str]] Tuple of modules supported by tracing provider to patch, by default all modules are patched provider: BaseProvider Tracing provider, by default it is aws_xray_sdk.core.xray_recorder Returns ------- Tracer Tracer instance with imported modules patched Example ------- **A Lambda function using Tracer** from aws_lambda_powertools import Tracer tracer = Tracer(service="greeting") @tracer.capture_method def greeting(name: str) -> Dict: return { "name": name } @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> Dict: print("Received event from Lambda...") response = greeting(name="Heitor") return response **Booking Lambda function using Tracer that adds additional annotation/metadata** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method def confirm_booking(booking_id: str) -> Dict: resp = add_confirmation(booking_id) tracer.put_annotation("BookingConfirmation", resp["requestId"]) tracer.put_metadata("Booking confirmation", resp) return resp @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> Dict: print("Received event from Lambda...") booking_id = event.get("booking_id") response = confirm_booking(booking_id=booking_id) return response **A Lambda function using service name via POWERTOOLS_SERVICE_NAME** export POWERTOOLS_SERVICE_NAME="booking" from aws_lambda_powertools import Tracer tracer = Tracer() @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> Dict: print("Received event from Lambda...") response = greeting(name="Lessa") return response **Reuse an existing instance of Tracer anywhere in the code** # lambda_handler.py from aws_lambda_powertools import Tracer tracer = Tracer() @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> Dict: ... # utils.py from aws_lambda_powertools import Tracer tracer = Tracer() ... Limitations ----------- * Async handler not supported """ _default_config: Dict[str, Any] = { "service": "", "disabled": False, "auto_patch": True, "patch_modules": None, "provider": None, } _config = copy.copy(_default_config) def __init__( self, service: Optional[str] = None, disabled: Optional[bool] = None, auto_patch: Optional[bool] = None, patch_modules: Optional[Sequence[str]] = None, provider: Optional[BaseProvider] = None, ): self.__build_config( service=service, disabled=disabled, auto_patch=auto_patch, patch_modules=patch_modules, provider=provider ) self.provider: BaseProvider = self._config["provider"] self.disabled = self._config["disabled"] self.service = self._config["service"] self.auto_patch = self._config["auto_patch"] if self.disabled: self._disable_tracer_provider() if self.auto_patch: self.patch(modules=patch_modules) if self._is_xray_provider(): self._disable_xray_trace_batching() def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]): """Adds annotation to existing segment or subsegment Parameters ---------- key : str Annotation key value : Union[str, numbers.Number, bool] Value for annotation Example ------- Custom annotation for a pseudo service named payment tracer = Tracer(service="payment") tracer.put_annotation("PaymentStatus", "CONFIRMED") """ if self.disabled: logger.debug("Tracing has been disabled, aborting put_annotation") return logger.debug(f"Annotating on key '{key}' with '{value}'") self.provider.put_annotation(key=key, value=value) def put_metadata(self, key: str, value: Any, namespace: Optional[str] = None): """Adds metadata to existing segment or subsegment Parameters ---------- key : str Metadata key value : any Value for metadata namespace : str, optional Namespace that metadata will lie under, by default None Example ------- Custom metadata for a pseudo service named payment tracer = Tracer(service="payment") response = collect_payment() tracer.put_metadata("Payment collection", response) """ if self.disabled: logger.debug("Tracing has been disabled, aborting put_metadata") return namespace = namespace or self.service logger.debug(f"Adding metadata on key '{key}' with '{value}' at namespace '{namespace}'") self.provider.put_metadata(key=key, value=value, namespace=namespace) def patch(self, modules: Optional[Sequence[str]] = None): """Patch modules for instrumentation. Patches all supported modules by default if none are given. Parameters ---------- modules : Optional[Sequence[str]] List of modules to be patched, optional by default """ if self.disabled: logger.debug("Tracing has been disabled, aborting patch") return if modules is None: self.provider.patch_all() else: self.provider.patch(modules) def capture_lambda_handler( self, lambda_handler: Union[Callable[[Dict, Any], Any], Optional[Callable[[Dict, Any, Optional[Dict]], Any]]] = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None, ): """Decorator to create subsegment for lambda handlers As Lambda follows (event, context) signature we can remove some of the boilerplate and also capture any exception any Lambda function throws or its response as metadata Parameters ---------- lambda_handler : Callable Method to annotate on capture_response : bool, optional Instructs tracer to not include handler's response as metadata capture_error : bool, optional Instructs tracer to not include handler's error as metadata, by default True Example ------- **Lambda function using capture_lambda_handler decorator** tracer = Tracer(service="payment") @tracer.capture_lambda_handler def handler(event, context): ... **Preventing Tracer to log response as metadata** tracer = Tracer(service="payment") @tracer.capture_lambda_handler(capture_response=False) def handler(event, context): ... Raises ------ err Exception raised by method """ # If handler is None we've been called with parameters # Return a partial function with args filled if lambda_handler is None: logger.debug("Decorator called with parameters") return functools.partial( self.capture_lambda_handler, capture_response=capture_response, capture_error=capture_error ) lambda_handler_name = lambda_handler.__name__ capture_response = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"), choice=capture_response ) capture_error = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"), choice=capture_error ) @functools.wraps(lambda_handler) def decorate(event, context, **kwargs): with self.provider.in_subsegment(name=f"## {lambda_handler_name}") as subsegment: try: logger.debug("Calling lambda handler") response = lambda_handler(event, context, **kwargs) logger.debug("Received lambda handler response successfully") self._add_response_as_metadata( method_name=lambda_handler_name, data=response, subsegment=subsegment, capture_response=capture_response, ) except Exception as err: logger.exception(f"Exception received from {lambda_handler_name}") self._add_full_exception_as_metadata( method_name=lambda_handler_name, error=err, subsegment=subsegment, capture_error=capture_error ) raise finally: global is_cold_start logger.debug("Annotating cold start") subsegment.put_annotation(key="ColdStart", value=is_cold_start) if is_cold_start: is_cold_start = False if self.service: subsegment.put_annotation(key="Service", value=self.service) return response return decorate # see #465 @overload def capture_method(self, method: "AnyCallableT") -> "AnyCallableT": ... # pragma: no cover @overload def capture_method( self, method: None = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None, ) -> Callable[["AnyCallableT"], "AnyCallableT"]: ... # pragma: no cover def capture_method( self, method: Optional[AnyCallableT] = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None, ) -> AnyCallableT: """Decorator to create subsegment for arbitrary functions It also captures both response and exceptions as metadata and creates a subsegment named `## <method_module.method_qualifiedname>` # see here: [Qualified name for classes and functions](https://peps.python.org/pep-3155/) When running [async functions concurrently](https://docs.python.org/3/library/asyncio-task.html#id6), methods may impact each others subsegment, and can trigger and AlreadyEndedException from X-Ray due to async nature. For this use case, either use `capture_method` only where `async.gather` is called, or use `in_subsegment_async` context manager via our escape hatch mechanism - See examples. Parameters ---------- method : Callable Method to annotate on capture_response : bool, optional Instructs tracer to not include method's response as metadata capture_error : bool, optional Instructs tracer to not include handler's error as metadata, by default True Example ------- **Custom function using capture_method decorator** tracer = Tracer(service="payment") @tracer.capture_method def some_function() **Custom async method using capture_method decorator** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method async def confirm_booking(booking_id: str) -> Dict: resp = call_to_booking_service() tracer.put_annotation("BookingConfirmation", resp["requestId"]) tracer.put_metadata("Booking confirmation", resp) return resp def lambda_handler(event: dict, context: Any) -> Dict: booking_id = event.get("booking_id") asyncio.run(confirm_booking(booking_id=booking_id)) **Custom generator function using capture_method decorator** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method def bookings_generator(booking_id): resp = call_to_booking_service() yield resp[0] yield resp[1] def lambda_handler(event: dict, context: Any) -> Dict: gen = bookings_generator(booking_id=booking_id) result = list(gen) **Custom generator context manager using capture_method decorator** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method @contextlib.contextmanager def booking_actions(booking_id): resp = call_to_booking_service() yield "example result" cleanup_stuff() def lambda_handler(event: dict, context: Any) -> Dict: booking_id = event.get("booking_id") with booking_actions(booking_id=booking_id) as booking: result = booking **Tracing nested async calls** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method async def get_identity(): ... @tracer.capture_method async def long_async_call(): ... @tracer.capture_method async def async_tasks(): await get_identity() ret = await long_async_call() return { "task": "done", **ret } **Safely tracing concurrent async calls with decorator** This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164) from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") async def get_identity(): async with aioboto3.client("sts") as sts: account = await sts.get_caller_identity() return account async def long_async_call(): ... @tracer.capture_method async def async_tasks(): _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True) return { "task": "done", **ret } **Safely tracing each concurrent async calls with escape hatch** This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164) from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") async def get_identity(): async tracer.provider.in_subsegment_async("## get_identity"): ... async def long_async_call(): async tracer.provider.in_subsegment_async("## long_async_call"): ... @tracer.capture_method async def async_tasks(): _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True) return { "task": "done", **ret } Raises ------ err Exception raised by method """ # If method is None we've been called with parameters # Return a partial function with args filled if method is None: logger.debug("Decorator called with parameters") return cast( AnyCallableT, functools.partial(self.capture_method, capture_response=capture_response, capture_error=capture_error), ) # Example: app.ClassA.get_all # noqa E800 method_name = f"{method.__module__}.{method.__qualname__}" capture_response = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"), choice=capture_response ) capture_error = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"), choice=capture_error ) # Maintenance: Need a factory/builder here to simplify this now if inspect.iscoroutinefunction(method): return self._decorate_async_function( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name ) elif inspect.isgeneratorfunction(method): return self._decorate_generator_function( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name ) elif hasattr(method, "__wrapped__") and inspect.isgeneratorfunction(method.__wrapped__): # type: ignore return self._decorate_generator_function_with_context_manager( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name ) else: return self._decorate_sync_function( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name ) def _decorate_async_function( self, method: Callable, capture_response: Optional[Union[bool, str]] = None, capture_error: Optional[Union[bool, str]] = None, method_name: Optional[str] = None, ): @functools.wraps(method) async def decorate(*args, **kwargs): async with self.provider.in_subsegment_async(name=f"## {method_name}") as subsegment: try: logger.debug(f"Calling method: {method_name}") response = await method(*args, **kwargs) self._add_response_as_metadata( method_name=method_name, data=response, subsegment=subsegment, capture_response=capture_response ) except Exception as err: logger.exception(f"Exception received from '{method_name}' method") self._add_full_exception_as_metadata( method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error ) raise return response return decorate def _decorate_generator_function( self, method: Callable, capture_response: Optional[Union[bool, str]] = None, capture_error: Optional[Union[bool, str]] = None, method_name: Optional[str] = None, ): @functools.wraps(method) def decorate(*args, **kwargs): with self.provider.in_subsegment(name=f"## {method_name}") as subsegment: try: logger.debug(f"Calling method: {method_name}") result = yield from method(*args, **kwargs) self._add_response_as_metadata( method_name=method_name, data=result, subsegment=subsegment, capture_response=capture_response ) except Exception as err: logger.exception(f"Exception received from '{method_name}' method") self._add_full_exception_as_metadata( method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error ) raise return result return decorate def _decorate_generator_function_with_context_manager( self, method: Callable, capture_response: Optional[Union[bool, str]] = None, capture_error: Optional[Union[bool, str]] = None, method_name: Optional[str] = None, ): @functools.wraps(method) @contextlib.contextmanager def decorate(*args, **kwargs): with self.provider.in_subsegment(name=f"## {method_name}") as subsegment: try: logger.debug(f"Calling method: {method_name}") with method(*args, **kwargs) as return_val: result = return_val yield result self._add_response_as_metadata( method_name=method_name, data=result, subsegment=subsegment, capture_response=capture_response ) except Exception as err: logger.exception(f"Exception received from '{method_name}' method") self._add_full_exception_as_metadata( method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error ) raise return decorate def _decorate_sync_function( self, method: AnyCallableT, capture_response: Optional[Union[bool, str]] = None, capture_error: Optional[Union[bool, str]] = None, method_name: Optional[str] = None, ) -> AnyCallableT: @functools.wraps(method) def decorate(*args, **kwargs): with self.provider.in_subsegment(name=f"## {method_name}") as subsegment: try: logger.debug(f"Calling method: {method_name}") response = method(*args, **kwargs) self._add_response_as_metadata( method_name=method_name, data=response, subsegment=subsegment, capture_response=capture_response, ) except Exception as err: logger.exception(f"Exception received from '{method_name}' method") self._add_full_exception_as_metadata( method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error ) raise return response return cast(AnyCallableT, decorate) def _add_response_as_metadata( self, method_name: Optional[str] = None, data: Optional[Any] = None, subsegment: Optional[BaseSegment] = None, capture_response: Optional[Union[bool, str]] = None, ): """Add response as metadata for given subsegment Parameters ---------- method_name : str, optional method name to add as metadata key, by default None data : Any, optional data to add as subsegment metadata, by default None subsegment : BaseSegment, optional existing subsegment to add metadata on, by default None capture_response : bool, optional Do not include response as metadata """ if data is None or not capture_response or subsegment is None: return subsegment.put_metadata(key=f"{method_name} response", value=data, namespace=self.service) def _add_full_exception_as_metadata( self, method_name: str, error: Exception, subsegment: BaseSegment, capture_error: Optional[bool] = None, ): """Add full exception object as metadata for given subsegment Parameters ---------- method_name : str method name to add as metadata key, by default None error : Exception error to add as subsegment metadata, by default None subsegment : BaseSegment existing subsegment to add metadata on, by default None capture_error : bool, optional Do not include error as metadata, by default True """ if not capture_error: return subsegment.put_metadata(key=f"{method_name} error", value=error, namespace=self.service) @staticmethod def _disable_tracer_provider(): """Forcefully disables tracing""" logger.debug("Disabling tracer provider...") aws_xray_sdk.global_sdk_config.set_sdk_enabled(False) @staticmethod def _is_tracer_disabled() -> Union[bool, str]: """Detects whether trace has been disabled Tracing is automatically disabled in the following conditions: 1. Explicitly disabled via `TRACE_DISABLED` environment variable 2. Running in Lambda Emulators, or locally where X-Ray Daemon will not be listening 3. Explicitly disabled via constructor e.g `Tracer(disabled=True)` Returns ------- Union[bool, str] """ logger.debug("Verifying whether Tracing has been disabled") is_lambda_env = os.getenv(constants.LAMBDA_TASK_ROOT_ENV) is_disabled = resolve_truthy_env_var_choice(env=os.getenv(constants.TRACER_DISABLED_ENV, "false")) if is_disabled: logger.debug("Tracing has been disabled via env var POWERTOOLS_TRACE_DISABLED") return is_disabled if not is_lambda_env: logger.debug("Running outside Lambda env; disabling Tracing") return True return False def __build_config( self, service: Optional[str] = None, disabled: Optional[bool] = None, auto_patch: Optional[bool] = None, patch_modules: Optional[Sequence[str]] = None, provider: Optional[BaseProvider] = None, ): """Populates Tracer config for new and existing initializations""" is_disabled = disabled if disabled is not None else self._is_tracer_disabled() is_service = resolve_env_var_choice(choice=service, env=os.getenv(constants.SERVICE_NAME_ENV)) # Logic: Choose overridden option first, previously cached config, or default if available self._config["provider"] = provider or self._config["provider"] or self._patch_xray_provider() self._config["auto_patch"] = auto_patch if auto_patch is not None else self._config["auto_patch"] self._config["service"] = is_service or self._config["service"] self._config["disabled"] = is_disabled or self._config["disabled"] self._config["patch_modules"] = patch_modules or self._config["patch_modules"] @classmethod def _reset_config(cls): cls._config = copy.copy(cls._default_config) def _patch_xray_provider(self): # Due to Lazy Import, we need to activate `core` attrib via import # we also need to include `patch`, `patch_all` methods # to ensure patch calls are done via the provider from aws_xray_sdk.core import xray_recorder # type: ignore provider = xray_recorder provider.patch = aws_xray_sdk.core.patch provider.patch_all = aws_xray_sdk.core.patch_all return provider def _disable_xray_trace_batching(self): """Configure X-Ray SDK to send subsegment individually over batching Known issue: https://github.com/awslabs/aws-lambda-powertools-python/issues/283 """ if self.disabled: logger.debug("Tracing has been disabled, aborting streaming override") return aws_xray_sdk.core.xray_recorder.configure(streaming_threshold=0) def _is_xray_provider(self): return "aws_xray_sdk" in self.provider.__module__ def ignore_endpoint(self, hostname: Optional[str] = None, urls: Optional[List[str]] = None): """If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being requested. > NOTE: If the provider is not xray, nothing will be added to ignore list Documentation -------------- - https://github.com/aws/aws-xray-sdk-python#ignoring-httplib-requests Parameters ---------- hostname : Optional, str The hostname is matched using the Python fnmatch library which does Unix glob style matching. urls: Optional, List[str] List of urls to ignore. Example `tracer.ignore_endpoint(urls=["/ignored-url"])` """ if not self._is_xray_provider(): return from aws_xray_sdk.ext.httplib import add_ignored # type: ignore add_ignored(hostname=hostname, urls=urls)
Methods
def capture_lambda_handler(self, lambda_handler: Union[Callable[[Dict[~KT, ~VT], Any], Any], Callable[[Dict[~KT, ~VT], Any, Optional[Dict[~KT, ~VT]]], Any], None] = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None)
-
Decorator to create subsegment for lambda handlers
As Lambda follows (event, context) signature we can remove some of the boilerplate and also capture any exception any Lambda function throws or its response as metadata
Parameters
lambda_handler
:Callable
- Method to annotate on
capture_response
:bool
, optional- Instructs tracer to not include handler's response as metadata
capture_error
:bool
, optional- Instructs tracer to not include handler's error as metadata, by default True
Example
Lambda function using capture_lambda_handler decorator
tracer = Tracer(service="payment") @tracer.capture_lambda_handler def handler(event, context): ...
Preventing Tracer to log response as metadata
tracer = Tracer(service="payment") @tracer.capture_lambda_handler(capture_response=False) def handler(event, context): ...
Raises
err
- Exception raised by method
Expand source code
def capture_lambda_handler( self, lambda_handler: Union[Callable[[Dict, Any], Any], Optional[Callable[[Dict, Any, Optional[Dict]], Any]]] = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None, ): """Decorator to create subsegment for lambda handlers As Lambda follows (event, context) signature we can remove some of the boilerplate and also capture any exception any Lambda function throws or its response as metadata Parameters ---------- lambda_handler : Callable Method to annotate on capture_response : bool, optional Instructs tracer to not include handler's response as metadata capture_error : bool, optional Instructs tracer to not include handler's error as metadata, by default True Example ------- **Lambda function using capture_lambda_handler decorator** tracer = Tracer(service="payment") @tracer.capture_lambda_handler def handler(event, context): ... **Preventing Tracer to log response as metadata** tracer = Tracer(service="payment") @tracer.capture_lambda_handler(capture_response=False) def handler(event, context): ... Raises ------ err Exception raised by method """ # If handler is None we've been called with parameters # Return a partial function with args filled if lambda_handler is None: logger.debug("Decorator called with parameters") return functools.partial( self.capture_lambda_handler, capture_response=capture_response, capture_error=capture_error ) lambda_handler_name = lambda_handler.__name__ capture_response = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"), choice=capture_response ) capture_error = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"), choice=capture_error ) @functools.wraps(lambda_handler) def decorate(event, context, **kwargs): with self.provider.in_subsegment(name=f"## {lambda_handler_name}") as subsegment: try: logger.debug("Calling lambda handler") response = lambda_handler(event, context, **kwargs) logger.debug("Received lambda handler response successfully") self._add_response_as_metadata( method_name=lambda_handler_name, data=response, subsegment=subsegment, capture_response=capture_response, ) except Exception as err: logger.exception(f"Exception received from {lambda_handler_name}") self._add_full_exception_as_metadata( method_name=lambda_handler_name, error=err, subsegment=subsegment, capture_error=capture_error ) raise finally: global is_cold_start logger.debug("Annotating cold start") subsegment.put_annotation(key="ColdStart", value=is_cold_start) if is_cold_start: is_cold_start = False if self.service: subsegment.put_annotation(key="Service", value=self.service) return response return decorate
def capture_method(self, method: Optional[~AnyCallableT] = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None) ‑> ~AnyCallableT
-
Decorator to create subsegment for arbitrary functions
It also captures both response and exceptions as metadata and creates a subsegment named
## <method_module.method_qualifiedname>
see here: Qualified name for classes and functions
When running async functions concurrently, methods may impact each others subsegment, and can trigger and AlreadyEndedException from X-Ray due to async nature.
For this use case, either use
capture_method
only whereasync.gather
is called, or usein_subsegment_async
context manager via our escape hatch mechanism - See examples.Parameters
method
:Callable
- Method to annotate on
capture_response
:bool
, optional- Instructs tracer to not include method's response as metadata
capture_error
:bool
, optional- Instructs tracer to not include handler's error as metadata, by default True
Example
Custom function using capture_method decorator
tracer = Tracer(service="payment") @tracer.capture_method def some_function()
Custom async method using capture_method decorator
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method async def confirm_booking(booking_id: str) -> Dict: resp = call_to_booking_service() tracer.put_annotation("BookingConfirmation", resp["requestId"]) tracer.put_metadata("Booking confirmation", resp) return resp def lambda_handler(event: dict, context: Any) -> Dict: booking_id = event.get("booking_id") asyncio.run(confirm_booking(booking_id=booking_id))
Custom generator function using capture_method decorator
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method def bookings_generator(booking_id): resp = call_to_booking_service() yield resp[0] yield resp[1] def lambda_handler(event: dict, context: Any) -> Dict: gen = bookings_generator(booking_id=booking_id) result = list(gen)
Custom generator context manager using capture_method decorator
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method @contextlib.contextmanager def booking_actions(booking_id): resp = call_to_booking_service() yield "example result" cleanup_stuff() def lambda_handler(event: dict, context: Any) -> Dict: booking_id = event.get("booking_id") with booking_actions(booking_id=booking_id) as booking: result = booking
Tracing nested async calls
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method async def get_identity(): ... @tracer.capture_method async def long_async_call(): ... @tracer.capture_method async def async_tasks(): await get_identity() ret = await long_async_call() return { "task": "done", **ret }
Safely tracing concurrent async calls with decorator
This may not needed once this bug is closed
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") async def get_identity(): async with aioboto3.client("sts") as sts: account = await sts.get_caller_identity() return account async def long_async_call(): ... @tracer.capture_method async def async_tasks(): _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True) return { "task": "done", **ret }
Safely tracing each concurrent async calls with escape hatch
This may not needed once this bug is closed
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") async def get_identity(): async tracer.provider.in_subsegment_async("## get_identity"): ... async def long_async_call(): async tracer.provider.in_subsegment_async("## long_async_call"): ... @tracer.capture_method async def async_tasks(): _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True) return { "task": "done", **ret }
Raises
err
- Exception raised by method
Expand source code
def capture_method( self, method: Optional[AnyCallableT] = None, capture_response: Optional[bool] = None, capture_error: Optional[bool] = None, ) -> AnyCallableT: """Decorator to create subsegment for arbitrary functions It also captures both response and exceptions as metadata and creates a subsegment named `## <method_module.method_qualifiedname>` # see here: [Qualified name for classes and functions](https://peps.python.org/pep-3155/) When running [async functions concurrently](https://docs.python.org/3/library/asyncio-task.html#id6), methods may impact each others subsegment, and can trigger and AlreadyEndedException from X-Ray due to async nature. For this use case, either use `capture_method` only where `async.gather` is called, or use `in_subsegment_async` context manager via our escape hatch mechanism - See examples. Parameters ---------- method : Callable Method to annotate on capture_response : bool, optional Instructs tracer to not include method's response as metadata capture_error : bool, optional Instructs tracer to not include handler's error as metadata, by default True Example ------- **Custom function using capture_method decorator** tracer = Tracer(service="payment") @tracer.capture_method def some_function() **Custom async method using capture_method decorator** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method async def confirm_booking(booking_id: str) -> Dict: resp = call_to_booking_service() tracer.put_annotation("BookingConfirmation", resp["requestId"]) tracer.put_metadata("Booking confirmation", resp) return resp def lambda_handler(event: dict, context: Any) -> Dict: booking_id = event.get("booking_id") asyncio.run(confirm_booking(booking_id=booking_id)) **Custom generator function using capture_method decorator** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method def bookings_generator(booking_id): resp = call_to_booking_service() yield resp[0] yield resp[1] def lambda_handler(event: dict, context: Any) -> Dict: gen = bookings_generator(booking_id=booking_id) result = list(gen) **Custom generator context manager using capture_method decorator** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method @contextlib.contextmanager def booking_actions(booking_id): resp = call_to_booking_service() yield "example result" cleanup_stuff() def lambda_handler(event: dict, context: Any) -> Dict: booking_id = event.get("booking_id") with booking_actions(booking_id=booking_id) as booking: result = booking **Tracing nested async calls** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method async def get_identity(): ... @tracer.capture_method async def long_async_call(): ... @tracer.capture_method async def async_tasks(): await get_identity() ret = await long_async_call() return { "task": "done", **ret } **Safely tracing concurrent async calls with decorator** This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164) from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") async def get_identity(): async with aioboto3.client("sts") as sts: account = await sts.get_caller_identity() return account async def long_async_call(): ... @tracer.capture_method async def async_tasks(): _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True) return { "task": "done", **ret } **Safely tracing each concurrent async calls with escape hatch** This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164) from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") async def get_identity(): async tracer.provider.in_subsegment_async("## get_identity"): ... async def long_async_call(): async tracer.provider.in_subsegment_async("## long_async_call"): ... @tracer.capture_method async def async_tasks(): _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True) return { "task": "done", **ret } Raises ------ err Exception raised by method """ # If method is None we've been called with parameters # Return a partial function with args filled if method is None: logger.debug("Decorator called with parameters") return cast( AnyCallableT, functools.partial(self.capture_method, capture_response=capture_response, capture_error=capture_error), ) # Example: app.ClassA.get_all # noqa E800 method_name = f"{method.__module__}.{method.__qualname__}" capture_response = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"), choice=capture_response ) capture_error = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"), choice=capture_error ) # Maintenance: Need a factory/builder here to simplify this now if inspect.iscoroutinefunction(method): return self._decorate_async_function( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name ) elif inspect.isgeneratorfunction(method): return self._decorate_generator_function( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name ) elif hasattr(method, "__wrapped__") and inspect.isgeneratorfunction(method.__wrapped__): # type: ignore return self._decorate_generator_function_with_context_manager( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name ) else: return self._decorate_sync_function( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name )
def ignore_endpoint(self, hostname: Optional[str] = None, urls: Optional[List[str]] = None)
-
If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being requested.
NOTE: If the provider is not xray, nothing will be added to ignore list
Documentation
Parameters
hostname
:Optional, str
- The hostname is matched using the Python fnmatch library which does Unix glob style matching.
urls
:Optional, List[str]
- List of urls to ignore. Example
tracer.ignore_endpoint(urls=["/ignored-url"])
Expand source code
def ignore_endpoint(self, hostname: Optional[str] = None, urls: Optional[List[str]] = None): """If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being requested. > NOTE: If the provider is not xray, nothing will be added to ignore list Documentation -------------- - https://github.com/aws/aws-xray-sdk-python#ignoring-httplib-requests Parameters ---------- hostname : Optional, str The hostname is matched using the Python fnmatch library which does Unix glob style matching. urls: Optional, List[str] List of urls to ignore. Example `tracer.ignore_endpoint(urls=["/ignored-url"])` """ if not self._is_xray_provider(): return from aws_xray_sdk.ext.httplib import add_ignored # type: ignore add_ignored(hostname=hostname, urls=urls)
def patch(self, modules: Optional[Sequence[str]] = None)
-
Patch modules for instrumentation.
Patches all supported modules by default if none are given.
Parameters
modules
:Optional[Sequence[str]]
- List of modules to be patched, optional by default
Expand source code
def patch(self, modules: Optional[Sequence[str]] = None): """Patch modules for instrumentation. Patches all supported modules by default if none are given. Parameters ---------- modules : Optional[Sequence[str]] List of modules to be patched, optional by default """ if self.disabled: logger.debug("Tracing has been disabled, aborting patch") return if modules is None: self.provider.patch_all() else: self.provider.patch(modules)
def put_annotation(self, key: str, value: Union[str, numbers.Number, bool])
-
Adds annotation to existing segment or subsegment
Parameters
key
:str
- Annotation key
value
:Union[str, numbers.Number, bool]
- Value for annotation
Example
Custom annotation for a pseudo service named payment
tracer = Tracer(service="payment") tracer.put_annotation("PaymentStatus", "CONFIRMED")
Expand source code
def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]): """Adds annotation to existing segment or subsegment Parameters ---------- key : str Annotation key value : Union[str, numbers.Number, bool] Value for annotation Example ------- Custom annotation for a pseudo service named payment tracer = Tracer(service="payment") tracer.put_annotation("PaymentStatus", "CONFIRMED") """ if self.disabled: logger.debug("Tracing has been disabled, aborting put_annotation") return logger.debug(f"Annotating on key '{key}' with '{value}'") self.provider.put_annotation(key=key, value=value)
def put_metadata(self, key: str, value: Any, namespace: Optional[str] = None)
-
Adds metadata to existing segment or subsegment
Parameters
key
:str
- Metadata key
value
:any
- Value for metadata
namespace
:str
, optional- Namespace that metadata will lie under, by default None
Example
Custom metadata for a pseudo service named payment
tracer = Tracer(service="payment") response = collect_payment() tracer.put_metadata("Payment collection", response)
Expand source code
def put_metadata(self, key: str, value: Any, namespace: Optional[str] = None): """Adds metadata to existing segment or subsegment Parameters ---------- key : str Metadata key value : any Value for metadata namespace : str, optional Namespace that metadata will lie under, by default None Example ------- Custom metadata for a pseudo service named payment tracer = Tracer(service="payment") response = collect_payment() tracer.put_metadata("Payment collection", response) """ if self.disabled: logger.debug("Tracing has been disabled, aborting put_metadata") return namespace = namespace or self.service logger.debug(f"Adding metadata on key '{key}' with '{value}' at namespace '{namespace}'") self.provider.put_metadata(key=key, value=value, namespace=namespace)