Package aws_lambda_powertools
Top-level package for Lambda Python Powertools.
Sub-modules
aws_lambda_powertools.event_handler
-
Event handler decorators for common Lambda events
aws_lambda_powertools.exceptions
-
Shared exceptions that don't belong to a single utility
aws_lambda_powertools.logging
-
Logging utility
aws_lambda_powertools.metrics
-
CloudWatch Embedded Metric Format utility
aws_lambda_powertools.middleware_factory
-
Utilities to enhance middlewares
aws_lambda_powertools.package_logger
aws_lambda_powertools.shared
-
Internal shared functions. Do not rely on it besides internal usage.
aws_lambda_powertools.tracing
-
Tracing utility
aws_lambda_powertools.utilities
-
General utilities for Powertools
aws_lambda_powertools.warnings
-
Shared warnings that don't belong to a single utility
Functions
def single_metric(name: str, unit: MetricUnit, value: float, resolution: MetricResolution | int = 60, namespace: str | None = None, default_dimensions: dict[str, str] | None = None) ‑> Generator[SingleMetric, None, None]
-
Context manager to simplify creation of a single metric
Example
Creates cold start metric with function_version as dimension
from aws_lambda_powertools import single_metric from aws_lambda_powertools.metrics import MetricUnit from aws_lambda_powertools.metrics import MetricResolution with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, resolution=MetricResolution.Standard, namespace="ServerlessAirline") as metric: metric.add_dimension(name="function_version", value="47")
Same as above but set namespace using environment variable
$ export POWERTOOLS_METRICS_NAMESPACE="ServerlessAirline" from aws_lambda_powertools import single_metric from aws_lambda_powertools.metrics import MetricUnit from aws_lambda_powertools.metrics import MetricResolution with single_metric(name="ColdStart", unit=MetricUnit.Count, value=1, resolution=MetricResolution.Standard) as metric: metric.add_dimension(name="function_version", value="47")
Parameters
name
:str
- Metric name
unit
:MetricUnit
aws_lambda_powertools.helper.models.MetricUnit
resolution
:MetricResolution
aws_lambda_powertools.helper.models.MetricResolution
value
:float
- Metric value
namespace
:str
- Namespace for metrics
default_dimensions
:dict[str, str]
, optional- Metric dimensions as key=value that will always be present
Yields
SingleMetric
- SingleMetric class instance
Raises
MetricUnitError
- When metric metric isn't supported by CloudWatch
MetricResolutionError
- When metric resolution isn't supported by CloudWatch
MetricValueError
- When metric value isn't a number
SchemaValidationError
- When metric object fails EMF schema validation
Classes
class Logger (service: str | None = None, level: str | int | None = None, child: bool = False, sampling_rate: float | None = None, stream: IO[str] | None = None, logger_formatter: PowertoolsFormatter | None = None, logger_handler: logging.Handler | None = None, log_uncaught_exceptions: bool = False, json_serializer: Callable[[dict], str] | None = None, json_deserializer: Callable[[dict | str | bool | int | float], str] | None = None, json_default: Callable[[Any], Any] | None = None, datefmt: str | None = None, use_datetime_directive: bool = False, log_record_order: list[str] | None = None, utc: bool = False, use_rfc3339: bool = False, serialize_stacktrace: bool = True, **kwargs)
-
Creates and setups a logger to format statements in JSON.
Includes service name and any additional key=value into logs It also accepts both service name or level explicitly via env vars
Environment Variables
POWERTOOLS_SERVICE_NAME : str service name POWERTOOLS_LOG_LEVEL: str logging level (e.g. INFO, DEBUG) POWERTOOLS_LOGGER_SAMPLE_RATE: float sampling rate ranging from 0 to 1, 1 being 100% sampling
Parameters
service
:str
, optional- service name to be appended in logs, by default "service_undefined"
level
:str, int optional
- The level to set. Can be a string representing the level name: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' or an integer representing the level value: 10 for 'DEBUG', 20 for 'INFO', 30 for 'WARNING', 40 for 'ERROR', 50 for 'CRITICAL'. by default "INFO"
child
:bool
, optional- create a child Logger named
. , False by default sample_rate
:float
, optional- sample rate for debug calls within execution context defaults to 0.0
stream
:sys.stdout
, optional- valid output for a logging stream, by default sys.stdout
logger_formatter
:PowertoolsFormatter
, optional- custom logging formatter that implements PowertoolsFormatter
logger_handler
:logging.Handler
, optional- custom logging handler e.g. logging.FileHandler("file.log")
log_uncaught_exceptions
:bool, by default False
-
logs uncaught exception using sys.excepthook
See: https://docs.python.org/3/library/sys.html#sys.excepthook
Parameters Propagated To Lambdapowertoolsformatter
datefmt: str, optional String directives (strftime) to format log timestamp using
time
, by default it uses 2021-05-03 11:47:12,494+0200. use_datetime_directive: bool, optional Interpretdatefmt
as a format string fordatetime.datetime.strftime
, rather thantime.strftime
.See <https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior> . This also supports a custom %F directive for milliseconds.
use_rfc3339: bool, optional Whether to use a popular date format that complies with both RFC3339 and ISO8601. e.g., 2022-10-27T16:27:43.738+02:00. json_serializer : Callable, optional function to serialize
obj
to a JSON formattedstr
, by default json.dumps json_deserializer : Callable, optional function to deserializestr
,bytes
, bytearraycontaining a JSON document to a Python
obj`, by default json.loads json_default : Callable, optional function to coerce unserializable values, by defaultstr()
Only used when no custom formatter is set
utc : bool, optional set logging timestamp to UTC, by default False to continue to use local time as per stdlib log_record_order : list, optional set order of log keys when logging, by default ["level", "location", "message", "timestamp"]
Example
Setups structured logging in JSON for Lambda functions with explicit service name
>>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> def handler(event, context): logger.info("Hello")
Setups structured logging in JSON for Lambda functions using env vars
$ export POWERTOOLS_SERVICE_NAME="payment" $ export POWERTOOLS_LOGGER_SAMPLE_RATE=0.01 # 1% debug sampling >>> from aws_lambda_powertools import Logger >>> logger = Logger() >>> >>> def handler(event, context): logger.info("Hello")
Append payment_id to previously setup logger
>>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> def handler(event, context): logger.append_keys(payment_id=event["payment_id"]) logger.info("Hello")
Create child Logger using logging inheritance via child param
>>> # app.py >>> import another_file >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> # another_file.py >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment", child=True)
Logging in UTC timezone
>>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", utc=True)
Brings message as the first key in log statements
>>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", log_record_order=["message"])
Logging to a file instead of standard output for testing
>>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", logger_handler=logging.FileHandler("log.json"))
Raises
InvalidLoggerSamplingRateError
- When sampling rate provided is not a float
Expand source code
class Logger: """Creates and setups a logger to format statements in JSON. Includes service name and any additional key=value into logs It also accepts both service name or level explicitly via env vars Environment variables --------------------- POWERTOOLS_SERVICE_NAME : str service name POWERTOOLS_LOG_LEVEL: str logging level (e.g. INFO, DEBUG) POWERTOOLS_LOGGER_SAMPLE_RATE: float sampling rate ranging from 0 to 1, 1 being 100% sampling Parameters ---------- service : str, optional service name to be appended in logs, by default "service_undefined" level : str, int optional The level to set. Can be a string representing the level name: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' or an integer representing the level value: 10 for 'DEBUG', 20 for 'INFO', 30 for 'WARNING', 40 for 'ERROR', 50 for 'CRITICAL'. by default "INFO" child: bool, optional create a child Logger named <service>.<caller_file_name>, False by default sample_rate: float, optional sample rate for debug calls within execution context defaults to 0.0 stream: sys.stdout, optional valid output for a logging stream, by default sys.stdout logger_formatter: PowertoolsFormatter, optional custom logging formatter that implements PowertoolsFormatter logger_handler: logging.Handler, optional custom logging handler e.g. logging.FileHandler("file.log") log_uncaught_exceptions: bool, by default False logs uncaught exception using sys.excepthook See: https://docs.python.org/3/library/sys.html#sys.excepthook Parameters propagated to LambdaPowertoolsFormatter -------------------------------------------------- datefmt: str, optional String directives (strftime) to format log timestamp using `time`, by default it uses 2021-05-03 11:47:12,494+0200. use_datetime_directive: bool, optional Interpret `datefmt` as a format string for `datetime.datetime.strftime`, rather than `time.strftime`. See https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior . This also supports a custom %F directive for milliseconds. use_rfc3339: bool, optional Whether to use a popular date format that complies with both RFC3339 and ISO8601. e.g., 2022-10-27T16:27:43.738+02:00. json_serializer : Callable, optional function to serialize `obj` to a JSON formatted `str`, by default json.dumps json_deserializer : Callable, optional function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`, by default json.loads json_default : Callable, optional function to coerce unserializable values, by default `str()` Only used when no custom formatter is set utc : bool, optional set logging timestamp to UTC, by default False to continue to use local time as per stdlib log_record_order : list, optional set order of log keys when logging, by default ["level", "location", "message", "timestamp"] Example ------- **Setups structured logging in JSON for Lambda functions with explicit service name** >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> def handler(event, context): logger.info("Hello") **Setups structured logging in JSON for Lambda functions using env vars** $ export POWERTOOLS_SERVICE_NAME="payment" $ export POWERTOOLS_LOGGER_SAMPLE_RATE=0.01 # 1% debug sampling >>> from aws_lambda_powertools import Logger >>> logger = Logger() >>> >>> def handler(event, context): logger.info("Hello") **Append payment_id to previously setup logger** >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> def handler(event, context): logger.append_keys(payment_id=event["payment_id"]) logger.info("Hello") **Create child Logger using logging inheritance via child param** >>> # app.py >>> import another_file >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment") >>> >>> # another_file.py >>> from aws_lambda_powertools import Logger >>> logger = Logger(service="payment", child=True) **Logging in UTC timezone** >>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", utc=True) **Brings message as the first key in log statements** >>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", log_record_order=["message"]) **Logging to a file instead of standard output for testing** >>> # app.py >>> import logging >>> from aws_lambda_powertools import Logger >>> >>> logger = Logger(service="payment", logger_handler=logging.FileHandler("log.json")) Raises ------ InvalidLoggerSamplingRateError When sampling rate provided is not a float """ # noqa: E501 def __init__( self, service: str | None = None, level: str | int | None = None, child: bool = False, sampling_rate: float | None = None, stream: IO[str] | None = None, logger_formatter: PowertoolsFormatter | None = None, logger_handler: logging.Handler | None = None, log_uncaught_exceptions: bool = False, json_serializer: Callable[[dict], str] | None = None, json_deserializer: Callable[[dict | str | bool | int | float], str] | None = None, json_default: Callable[[Any], Any] | None = None, datefmt: str | None = None, use_datetime_directive: bool = False, log_record_order: list[str] | None = None, utc: bool = False, use_rfc3339: bool = False, serialize_stacktrace: bool = True, **kwargs, ) -> None: self.service = resolve_env_var_choice( choice=service, env=os.getenv(constants.SERVICE_NAME_ENV, "service_undefined"), ) self.sampling_rate = resolve_env_var_choice( choice=sampling_rate, env=os.getenv(constants.LOGGER_LOG_SAMPLING_RATE), ) self.child = child self.logger_formatter = logger_formatter self._stream = stream or sys.stdout self.logger_handler = logger_handler or logging.StreamHandler(self._stream) self.log_uncaught_exceptions = log_uncaught_exceptions self._is_deduplication_disabled = resolve_truthy_env_var_choice( env=os.getenv(constants.LOGGER_LOG_DEDUPLICATION_ENV, "false"), ) self._default_log_keys = {"service": self.service, "sampling_rate": self.sampling_rate} self._logger = self._get_logger() # NOTE: This is primarily to improve UX, so IDEs can autocomplete LambdaPowertoolsFormatter options # previously, we masked all of them as kwargs thus limiting feature discovery formatter_options = { "json_serializer": json_serializer, "json_deserializer": json_deserializer, "json_default": json_default, "datefmt": datefmt, "use_datetime_directive": use_datetime_directive, "log_record_order": log_record_order, "utc": utc, "use_rfc3339": use_rfc3339, "serialize_stacktrace": serialize_stacktrace, } self._init_logger(formatter_options=formatter_options, log_level=level, **kwargs) if self.log_uncaught_exceptions: logger.debug("Replacing exception hook") sys.excepthook = functools.partial(log_uncaught_exception_hook, logger=self) # Prevent __getattr__ from shielding unknown attribute errors in type checkers # https://github.com/aws-powertools/powertools-lambda-python/issues/1660 if not TYPE_CHECKING: # pragma: no cover def __getattr__(self, name): # Proxy attributes not found to actual logger to support backward compatibility # https://github.com/aws-powertools/powertools-lambda-python/issues/97 return getattr(self._logger, name) def _get_logger(self) -> logging.Logger: """Returns a Logger named {self.service}, or {self.service.filename} for child loggers""" logger_name = self.service if self.child: logger_name = f"{self.service}.{_get_caller_filename()}" return logging.getLogger(logger_name) def _init_logger( self, formatter_options: dict | None = None, log_level: str | int | None = None, **kwargs, ) -> None: """Configures new logger""" # Skip configuration if it's a child logger or a pre-configured logger # to prevent the following: # a) multiple handlers being attached # b) different sampling mechanisms # c) multiple messages from being logged as handlers can be duplicated is_logger_preconfigured = getattr(self._logger, LOGGER_ATTRIBUTE_PRECONFIGURED, False) if self.child or is_logger_preconfigured: return self.setLevel(log_level) self._configure_sampling() self.addHandler(self.logger_handler) self.structure_logs(formatter_options=formatter_options, **kwargs) # Pytest Live Log feature duplicates log records for colored output # but we explicitly add a filter for log deduplication. # This flag disables this protection when you explicit want logs to be duplicated (#262) if not self._is_deduplication_disabled: logger.debug("Adding filter in root logger to suppress child logger records to bubble up") for handler in logging.root.handlers: # It'll add a filter to suppress any child logger from self.service # Example: `Logger(service="order")`, where service is Order # It'll reject all loggers starting with `order` e.g. order.checkout, order.shared handler.addFilter(SuppressFilter(self.service)) # as per bug in #249, we should not be pre-configuring an existing logger # therefore we set a custom attribute in the Logger that will be returned # std logging will return the same Logger with our attribute if name is reused logger.debug(f"Marking logger {self.service} as preconfigured") self._logger.init = True # type: ignore[attr-defined] def _configure_sampling(self) -> None: """Dynamically set log level based on sampling rate Raises ------ InvalidLoggerSamplingRateError When sampling rate provided is not a float """ try: if self.sampling_rate and random.random() <= float(self.sampling_rate): logger.debug("Setting log level to Debug due to sampling rate") self._logger.setLevel(logging.DEBUG) except ValueError: raise InvalidLoggerSamplingRateError( ( f"Expected a float value ranging 0 to 1, but received {self.sampling_rate} instead." "Please review POWERTOOLS_LOGGER_SAMPLE_RATE environment variable." ), ) @overload def inject_lambda_context( self, lambda_handler: AnyCallableT, log_event: bool | None = None, correlation_id_path: str | None = None, clear_state: bool | None = False, ) -> AnyCallableT: ... @overload def inject_lambda_context( self, lambda_handler: None = None, log_event: bool | None = None, correlation_id_path: str | None = None, clear_state: bool | None = False, ) -> Callable[[AnyCallableT], AnyCallableT]: ... def inject_lambda_context( self, lambda_handler: AnyCallableT | None = None, log_event: bool | None = None, correlation_id_path: str | None = None, clear_state: bool | None = False, ) -> Any: """Decorator to capture Lambda contextual info and inject into logger Parameters ---------- clear_state : bool, optional Instructs logger to remove any custom keys previously added lambda_handler : Callable Method to inject the lambda context log_event : bool, optional Instructs logger to log Lambda Event, by default False correlation_id_path: str, optional Optional JMESPath for the correlation_id Environment variables --------------------- POWERTOOLS_LOGGER_LOG_EVENT : str instruct logger to log Lambda Event (e.g. `"true", "True", "TRUE"`) Example ------- **Captures Lambda contextual runtime info (e.g memory, arn, req_id)** from aws_lambda_powertools import Logger logger = Logger(service="payment") @logger.inject_lambda_context def handler(event, context): logger.info("Hello") **Captures Lambda contextual runtime info and logs incoming request** from aws_lambda_powertools import Logger logger = Logger(service="payment") @logger.inject_lambda_context(log_event=True) def handler(event, context): logger.info("Hello") Returns ------- decorate : Callable Decorated lambda handler """ # If handler is None we've been called with parameters # Return a partial function with args filled if lambda_handler is None: logger.debug("Decorator called with parameters") return functools.partial( self.inject_lambda_context, log_event=log_event, correlation_id_path=correlation_id_path, clear_state=clear_state, ) log_event = resolve_truthy_env_var_choice( env=os.getenv(constants.LOGGER_LOG_EVENT_ENV, "false"), choice=log_event, ) @functools.wraps(lambda_handler) def decorate(event, context, *args, **kwargs): lambda_context = build_lambda_context_model(context) cold_start = _is_cold_start() if clear_state: self.structure_logs(cold_start=cold_start, **lambda_context.__dict__) else: self.append_keys(cold_start=cold_start, **lambda_context.__dict__) if correlation_id_path: self.set_correlation_id( jmespath_utils.query(envelope=correlation_id_path, data=event), ) if log_event: logger.debug("Event received") self.info(extract_event_from_common_models(event)) return lambda_handler(event, context, *args, **kwargs) return decorate def info( self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object, ) -> None: extra = extra or {} extra = {**extra, **kwargs} return self._logger.info( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra, ) def error( self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object, ) -> None: extra = extra or {} extra = {**extra, **kwargs} return self._logger.error( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra, ) def exception( self, msg: object, *args: object, exc_info: logging._ExcInfoType = True, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object, ) -> None: extra = extra or {} extra = {**extra, **kwargs} return self._logger.exception( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra, ) def critical( self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object, ) -> None: extra = extra or {} extra = {**extra, **kwargs} return self._logger.critical( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra, ) def warning( self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object, ) -> None: extra = extra or {} extra = {**extra, **kwargs} return self._logger.warning( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra, ) def debug( self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object, ) -> None: extra = extra or {} extra = {**extra, **kwargs} return self._logger.debug( msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra, ) def append_keys(self, **additional_keys: object) -> None: self.registered_formatter.append_keys(**additional_keys) def get_current_keys(self) -> dict[str, Any]: return self.registered_formatter.get_current_keys() def remove_keys(self, keys: Iterable[str]) -> None: self.registered_formatter.remove_keys(keys) # These specific thread-safe methods are necessary to manage shared context in concurrent environments. # They prevent race conditions and ensure data consistency across multiple threads. def thread_safe_append_keys(self, **additional_keys: object) -> None: # Append additional key-value pairs to the context safely in a thread-safe manner. self.registered_formatter.thread_safe_append_keys(**additional_keys) def thread_safe_get_current_keys(self) -> dict[str, Any]: # Retrieve the current context keys safely in a thread-safe manner. return self.registered_formatter.thread_safe_get_current_keys() def thread_safe_remove_keys(self, keys: Iterable[str]) -> None: # Remove specified keys from the context safely in a thread-safe manner. self.registered_formatter.thread_safe_remove_keys(keys) def thread_safe_clear_keys(self) -> None: # Clear all keys from the context safely in a thread-safe manner. self.registered_formatter.thread_safe_clear_keys() def structure_logs(self, append: bool = False, formatter_options: dict | None = None, **keys) -> None: """Sets logging formatting to JSON. Optionally, it can append keyword arguments to an existing logger, so it is available across future log statements. Last keyword argument and value wins if duplicated. Parameters ---------- append : bool, optional append keys provided to logger formatter, by default False formatter_options : dict, optional LambdaPowertoolsFormatter options to be propagated, by default {} """ formatter_options = formatter_options or {} # There are 3 operational modes for this method ## 1. Register a Powertools for AWS Lambda (Python) Formatter for the first time ## 2. Append new keys to the current logger formatter; deprecated in favour of append_keys ## 3. Add new keys and discard existing to the registered formatter # Mode 1 log_keys = {**self._default_log_keys, **keys} is_logger_preconfigured = getattr(self._logger, LOGGER_ATTRIBUTE_PRECONFIGURED, False) if not is_logger_preconfigured: formatter = self.logger_formatter or LambdaPowertoolsFormatter(**formatter_options, **log_keys) self.registered_handler.setFormatter(formatter) # when using a custom Powertools for AWS Lambda (Python) Formatter # standard and custom keys that are not Powertools for AWS Lambda (Python) Formatter parameters # should be appended and custom keys that might happen to be Powertools for AWS Lambda (Python) # Formatter parameters should be discarded this prevents adding them as custom keys, for example, # `json_default=<callable>` see https://github.com/aws-powertools/powertools-lambda-python/issues/1263 custom_keys = {k: v for k, v in log_keys.items() if k not in RESERVED_FORMATTER_CUSTOM_KEYS} return self.registered_formatter.append_keys(**custom_keys) # Mode 2 (legacy) if append: # Maintenance: Add deprecation warning for major version return self.append_keys(**keys) # Mode 3 self.registered_formatter.clear_state() self.registered_formatter.thread_safe_clear_keys() self.registered_formatter.append_keys(**log_keys) def set_correlation_id(self, value: str | None) -> None: """Sets the correlation_id in the logging json Parameters ---------- value : str, optional Value for the correlation id. None will remove the correlation_id """ self.append_keys(correlation_id=value) def get_correlation_id(self) -> str | None: """Gets the correlation_id in the logging json Returns ------- str, optional Value for the correlation id """ if isinstance(self.registered_formatter, LambdaPowertoolsFormatter): return self.registered_formatter.log_format.get("correlation_id") return None def setLevel(self, level: str | int | None) -> None: return self._logger.setLevel(self._determine_log_level(level)) def addHandler(self, handler: logging.Handler) -> None: return self._logger.addHandler(handler) def addFilter(self, filter: logging._FilterType) -> None: # noqa: A002 # filter built-in usage return self._logger.addFilter(filter) def removeFilter(self, filter: logging._FilterType) -> None: # noqa: A002 # filter built-in usage return self._logger.removeFilter(filter) @property def registered_handler(self) -> logging.Handler: """Convenience property to access the first logger handler""" # We ignore mypy here because self.child encodes whether or not self._logger.parent is # None, mypy can't see this from context but we can handlers = self._logger.parent.handlers if self.child else self._logger.handlers # type: ignore[union-attr] return handlers[0] @property def registered_formatter(self) -> BasePowertoolsFormatter: """Convenience property to access the first logger formatter""" return self.registered_handler.formatter # type: ignore[return-value] @property def log_level(self) -> int: return self._logger.level @property def name(self) -> str: return self._logger.name @property def handlers(self) -> list[logging.Handler]: """List of registered logging handlers Notes ----- Looking for the first configured handler? Use registered_handler property instead. """ return self._logger.handlers def _get_aws_lambda_log_level(self) -> str | None: """ Retrieve the log level for AWS Lambda from the Advanced Logging Controls feature. Returns: str | None: The corresponding logging level. """ return constants.LAMBDA_ADVANCED_LOGGING_LEVELS.get(os.getenv(constants.LAMBDA_LOG_LEVEL_ENV)) def _get_powertools_log_level(self, level: str | int | None) -> str | None: """Retrieve the log level for Powertools from the environment variable or level parameter. If log level is an integer, we convert to its respective string level `logging.getLevelName()`. If no log level is provided, we check env vars for the log level: POWERTOOLS_LOG_LEVEL_ENV and POWERTOOLS_LOG_LEVEL_LEGACY_ENV. Parameters: ----------- level : str | int | None The specified log level as a string, integer, or None. Environment variables --------------------- POWERTOOLS_LOG_LEVEL : str log level (e.g: INFO, DEBUG, WARNING, ERROR, CRITICAL) LOG_LEVEL (Legacy) : str log level (e.g: INFO, DEBUG, WARNING, ERROR, CRITICAL) Returns: -------- str | None: The corresponding logging level. Returns None if the log level is not explicitly specified. """ # noqa E501 # Extract log level from Powertools Logger env vars log_level_env = os.getenv(constants.POWERTOOLS_LOG_LEVEL_ENV) or os.getenv( constants.POWERTOOLS_LOG_LEVEL_LEGACY_ENV, ) # If level is an int (logging.INFO), return its respective string ("INFO") if isinstance(level, int): return logging.getLevelName(level) return level or log_level_env def _determine_log_level(self, level: str | int | None) -> str | int: """Determine the effective log level considering Lambda and Powertools preferences. It emits an UserWarning if Lambda ALC log level is lower than Logger log level. Parameters: ----------- level: str | int | None The specified log level as a string, integer, or None. Returns: ---------- str | int: The effective logging level. """ # This function consider the following order of precedence: # 1 - If a log level is set using AWS Lambda Advanced Logging Controls, it sets it. # 2 - If a log level is passed to the constructor, it sets it # 3 - If a log level is set via setLevel, it sets it. # 4 - If a log level is set via Powertools env variables, it sets it. # 5 - If none of the above is true, the default log level applies INFO. lambda_log_level = self._get_aws_lambda_log_level() powertools_log_level = self._get_powertools_log_level(level) if powertools_log_level and lambda_log_level: # If Powertools log level is set and higher than AWS Lambda Advanced Logging Controls, emit a warning if logging.getLevelName(lambda_log_level) > logging.getLevelName(powertools_log_level): warnings.warn( f"Current log level ({powertools_log_level}) does not match AWS Lambda Advanced Logging Controls " f"minimum log level ({lambda_log_level}). This can lead to data loss, consider adjusting them.", UserWarning, stacklevel=2, ) # AWS Lambda Advanced Logging Controls takes precedence over Powertools log level and we use this if lambda_log_level: return lambda_log_level # Check if Powertools log level is None, which means it's not set # We assume INFO as the default log level if powertools_log_level is None: return logging.INFO # Powertools log level is set, we use this return powertools_log_level.upper()
Instance variables
prop handlers : list[logging.Handler]
-
List of registered logging handlers
Notes
Looking for the first configured handler? Use registered_handler property instead.
Expand source code
@property def handlers(self) -> list[logging.Handler]: """List of registered logging handlers Notes ----- Looking for the first configured handler? Use registered_handler property instead. """ return self._logger.handlers
prop log_level : int
-
Expand source code
@property def log_level(self) -> int: return self._logger.level
prop name : str
-
Expand source code
@property def name(self) -> str: return self._logger.name
prop registered_formatter : BasePowertoolsFormatter
-
Convenience property to access the first logger formatter
Expand source code
@property def registered_formatter(self) -> BasePowertoolsFormatter: """Convenience property to access the first logger formatter""" return self.registered_handler.formatter # type: ignore[return-value]
prop registered_handler : logging.Handler
-
Convenience property to access the first logger handler
Expand source code
@property def registered_handler(self) -> logging.Handler: """Convenience property to access the first logger handler""" # We ignore mypy here because self.child encodes whether or not self._logger.parent is # None, mypy can't see this from context but we can handlers = self._logger.parent.handlers if self.child else self._logger.handlers # type: ignore[union-attr] return handlers[0]
Methods
def addFilter(self, filter: logging._FilterType)
def addHandler(self, handler: logging.Handler) ‑> None
def append_keys(self, **additional_keys: object) ‑> None
def critical(self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object)
def debug(self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object)
def error(self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object)
def exception(self, msg: object, *args: object, exc_info: logging._ExcInfoType = True, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object)
def get_correlation_id(self) ‑> str | None
-
Gets the correlation_id in the logging json
Returns
str
, optional- Value for the correlation id
def get_current_keys(self) ‑> dict[str, typing.Any]
def info(self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object)
def inject_lambda_context(self, lambda_handler: AnyCallableT | None = None, log_event: bool | None = None, correlation_id_path: str | None = None, clear_state: bool | None = False)
-
Decorator to capture Lambda contextual info and inject into logger
Parameters
clear_state
:bool
, optional- Instructs logger to remove any custom keys previously added
lambda_handler
:Callable
- Method to inject the lambda context
log_event
:bool
, optional- Instructs logger to log Lambda Event, by default False
correlation_id_path
:str
, optional- Optional JMESPath for the correlation_id
Environment Variables
POWERTOOLS_LOGGER_LOG_EVENT : str instruct logger to log Lambda Event (e.g.
"true", "True", "TRUE"
)Example
Captures Lambda contextual runtime info (e.g memory, arn, req_id)
from aws_lambda_powertools import Logger logger = Logger(service="payment") @logger.inject_lambda_context def handler(event, context): logger.info("Hello")
Captures Lambda contextual runtime info and logs incoming request
from aws_lambda_powertools import Logger logger = Logger(service="payment") @logger.inject_lambda_context(log_event=True) def handler(event, context): logger.info("Hello")
Returns
decorate
:Callable
- Decorated lambda handler
def removeFilter(self, filter: logging._FilterType)
def remove_keys(self, keys: Iterable[str]) ‑> None
def setLevel(self, level: str | int | None) ‑> None
def set_correlation_id(self, value: str | None) ‑> None
-
Sets the correlation_id in the logging json
Parameters
value
:str
, optional- Value for the correlation id. None will remove the correlation_id
def structure_logs(self, append: bool = False, formatter_options: dict | None = None, **keys) ‑> None
-
Sets logging formatting to JSON.
Optionally, it can append keyword arguments to an existing logger, so it is available across future log statements.
Last keyword argument and value wins if duplicated.
Parameters
append
:bool
, optional- append keys provided to logger formatter, by default False
formatter_options
:dict
, optional- LambdaPowertoolsFormatter options to be propagated, by default {}
def thread_safe_append_keys(self, **additional_keys: object) ‑> None
def thread_safe_clear_keys(self) ‑> None
def thread_safe_get_current_keys(self) ‑> dict[str, typing.Any]
def thread_safe_remove_keys(self, keys: Iterable[str]) ‑> None
def warning(self, msg: object, *args: object, exc_info: logging._ExcInfoType = None, stack_info: bool = False, stacklevel: int = 2, extra: Mapping[str, object] | None = None, **kwargs: object)
class Metrics (service: str | None = None, namespace: str | None = None, provider: AmazonCloudWatchEMFProvider | None = None)
-
Metrics create an CloudWatch EMF object with up to 100 metrics
Use Metrics when you need to create multiple metrics that have dimensions in common (e.g. service_name="payment").
Metrics up to 100 metrics in memory and are shared across all its instances. That means it can be safely instantiated outside of a Lambda function, or anywhere else.
A decorator (log_metrics) is provided so metrics are published at the end of its execution. If more than 100 metrics are added at a given function execution, these metrics are serialized and published before adding a given metric to prevent metric truncation.
Example
Creates a few metrics and publish at the end of a function execution
from aws_lambda_powertools import Metrics metrics = Metrics(namespace="ServerlessAirline", service="payment") @metrics.log_metrics(capture_cold_start_metric=True) def lambda_handler(): metrics.add_metric(name="BookingConfirmation", unit="Count", value=1) metrics.add_dimension(name="function_version", value="$LATEST") return True
Environment Variables
POWERTOOLS_METRICS_NAMESPACE : str metric namespace POWERTOOLS_SERVICE_NAME : str service name used for default dimension
Parameters
service
:str
, optional- service name to be used as metric dimension, by default "service_undefined"
namespace
:str
, optional- Namespace for metrics
provider
:AmazonCloudWatchEMFProvider
, optional- Pre-configured AmazonCloudWatchEMFProvider provider
Raises
MetricUnitError
- When metric unit isn't supported by CloudWatch
MetricResolutionError
- When metric resolution isn't supported by CloudWatch
MetricValueError
- When metric value isn't a number
SchemaValidationError
- When metric object fails EMF schema validation
Expand source code
class Metrics: """Metrics create an CloudWatch EMF object with up to 100 metrics Use Metrics when you need to create multiple metrics that have dimensions in common (e.g. service_name="payment"). Metrics up to 100 metrics in memory and are shared across all its instances. That means it can be safely instantiated outside of a Lambda function, or anywhere else. A decorator (log_metrics) is provided so metrics are published at the end of its execution. If more than 100 metrics are added at a given function execution, these metrics are serialized and published before adding a given metric to prevent metric truncation. Example ------- **Creates a few metrics and publish at the end of a function execution** from aws_lambda_powertools import Metrics metrics = Metrics(namespace="ServerlessAirline", service="payment") @metrics.log_metrics(capture_cold_start_metric=True) def lambda_handler(): metrics.add_metric(name="BookingConfirmation", unit="Count", value=1) metrics.add_dimension(name="function_version", value="$LATEST") return True Environment variables --------------------- POWERTOOLS_METRICS_NAMESPACE : str metric namespace POWERTOOLS_SERVICE_NAME : str service name used for default dimension Parameters ---------- service : str, optional service name to be used as metric dimension, by default "service_undefined" namespace : str, optional Namespace for metrics provider: AmazonCloudWatchEMFProvider, optional Pre-configured AmazonCloudWatchEMFProvider provider Raises ------ MetricUnitError When metric unit isn't supported by CloudWatch MetricResolutionError When metric resolution isn't supported by CloudWatch MetricValueError When metric value isn't a number SchemaValidationError When metric object fails EMF schema validation """ # NOTE: We use class attrs to share metrics data across instances # this allows customers to initialize Metrics() throughout their code base (and middlewares) # and not get caught by accident with metrics data loss, or data deduplication # e.g., m1 and m2 add metric ProductCreated, however m1 has 'version' dimension but m2 doesn't # Result: ProductCreated is created twice as we now have 2 different EMF blobs _metrics: dict[str, Any] = {} _dimensions: dict[str, str] = {} _metadata: dict[str, Any] = {} _default_dimensions: dict[str, Any] = {} def __init__( self, service: str | None = None, namespace: str | None = None, provider: AmazonCloudWatchEMFProvider | None = None, ): self.metric_set = self._metrics self.metadata_set = self._metadata self.default_dimensions = self._default_dimensions self.dimension_set = self._dimensions self.dimension_set.update(**self._default_dimensions) if provider is None: self.provider = AmazonCloudWatchEMFProvider( namespace=namespace, service=service, metric_set=self.metric_set, dimension_set=self.dimension_set, metadata_set=self.metadata_set, default_dimensions=self._default_dimensions, ) else: self.provider = provider def add_metric( self, name: str, unit: MetricUnit | str, value: float, resolution: MetricResolution | int = 60, ) -> None: self.provider.add_metric(name=name, unit=unit, value=value, resolution=resolution) def add_dimension(self, name: str, value: str) -> None: self.provider.add_dimension(name=name, value=value) def serialize_metric_set( self, metrics: dict | None = None, dimensions: dict | None = None, metadata: dict | None = None, ) -> CloudWatchEMFOutput: return self.provider.serialize_metric_set(metrics=metrics, dimensions=dimensions, metadata=metadata) def add_metadata(self, key: str, value: Any) -> None: self.provider.add_metadata(key=key, value=value) def set_timestamp(self, timestamp: int): """ Set the timestamp for the metric. Parameters: ----------- timestamp: int | datetime.datetime The timestamp to create the metric. If an integer is provided, it is assumed to be the epoch time in milliseconds. If a datetime object is provided, it will be converted to epoch time in milliseconds. """ self.provider.set_timestamp(timestamp=timestamp) def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None: self.provider.flush_metrics(raise_on_empty_metrics=raise_on_empty_metrics) def log_metrics( self, lambda_handler: AnyCallableT | None = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, default_dimensions: dict[str, str] | None = None, **kwargs, ): return self.provider.log_metrics( lambda_handler=lambda_handler, capture_cold_start_metric=capture_cold_start_metric, raise_on_empty_metrics=raise_on_empty_metrics, default_dimensions=default_dimensions, **kwargs, ) def set_default_dimensions(self, **dimensions) -> None: self.provider.set_default_dimensions(**dimensions) """Persist dimensions across Lambda invocations Parameters ---------- dimensions : dict[str, Any], optional metric dimensions as key=value Example ------- **Sets some default dimensions that will always be present across metrics and invocations** from aws_lambda_powertools import Metrics metrics = Metrics(namespace="ServerlessAirline", service="payment") metrics.set_default_dimensions(environment="demo", another="one") @metrics.log_metrics() def lambda_handler(): return True """ for name, value in dimensions.items(): self.add_dimension(name, value) self.default_dimensions.update(**dimensions) def clear_default_dimensions(self) -> None: self.provider.default_dimensions.clear() self.default_dimensions.clear() def clear_metrics(self) -> None: self.provider.clear_metrics() # We now allow customers to bring their own instance # of the AmazonCloudWatchEMFProvider provider # So we need to define getter/setter for namespace and service properties # To access these attributes on the provider instance. @property def namespace(self): return self.provider.namespace @namespace.setter def namespace(self, namespace): self.provider.namespace = namespace @property def service(self): return self.provider.service @service.setter def service(self, service): self.provider.service = service
Instance variables
prop namespace
-
Expand source code
@property def namespace(self): return self.provider.namespace
prop service
-
Expand source code
@property def service(self): return self.provider.service
Methods
def add_dimension(self, name: str, value: str) ‑> None
def add_metadata(self, key: str, value: Any) ‑> None
def add_metric(self, name: str, unit: MetricUnit | str, value: float, resolution: MetricResolution | int = 60)
def clear_default_dimensions(self) ‑> None
def clear_metrics(self) ‑> None
def flush_metrics(self, raise_on_empty_metrics: bool = False) ‑> None
def log_metrics(self, lambda_handler: AnyCallableT | None = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, default_dimensions: dict[str, str] | None = None, **kwargs)
def serialize_metric_set(self, metrics: dict | None = None, dimensions: dict | None = None, metadata: dict | None = None)
def set_default_dimensions(self, **dimensions) ‑> None
def set_timestamp(self, timestamp: int)
-
Set the timestamp for the metric.
Parameters:
timestamp: int | datetime.datetime The timestamp to create the metric. If an integer is provided, it is assumed to be the epoch time in milliseconds. If a datetime object is provided, it will be converted to epoch time in milliseconds.
class Tracer (service: str | None = None, disabled: bool | None = None, auto_patch: bool | None = None, patch_modules: Sequence[str] | None = None, provider: BaseProvider | None = None)
-
Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions
When running locally, it detects whether it's running via SAM CLI, and if it is it returns dummy segments/subsegments instead.
By default, it patches all available libraries supported by X-Ray SDK. Patching is automatically disabled when running locally via SAM CLI or by any other means.
Ref: https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/thirdparty.html
Tracer keeps a copy of its configuration as it can be instantiated more than once. This is useful when you are using your own middlewares and want to utilize an existing Tracer. Make sure to set
auto_patch=False
in subsequent Tracer instances to avoid double patching.Environment Variables
POWERTOOLS_TRACE_DISABLED : str disable tracer (e.g.
"true", "True", "TRUE"
) POWERTOOLS_SERVICE_NAME : str service name POWERTOOLS_TRACER_CAPTURE_RESPONSE : str disable auto-capture response as metadata (e.g."true", "True", "TRUE"
) POWERTOOLS_TRACER_CAPTURE_ERROR : str disable auto-capture error as metadata (e.g."true", "True", "TRUE"
)Parameters
service
:str
- Service name that will be appended in all tracing metadata
auto_patch
:bool
- Patch existing imported modules during initialization, by default True
disabled
:bool
- Flag to explicitly disable tracing, useful when running/testing locally
Env POWERTOOLS_TRACE_DISABLED="true"
patch_modules
:Sequence[str] | None
- Tuple of modules supported by tracing provider to patch, by default all modules are patched
provider
:BaseProvider
- Tracing provider, by default it is aws_xray_sdk.core.xray_recorder
Returns
Tracer
- Tracer instance with imported modules patched
Example
A Lambda function using Tracer
from aws_lambda_powertools import Tracer tracer = Tracer(service="greeting") @tracer.capture_method def greeting(name: str) -> dict: return { "name": name } @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> dict: print("Received event from Lambda...") response = greeting(name="Heitor") return response
Booking Lambda function using Tracer that adds additional annotation/metadata
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method def confirm_booking(booking_id: str) -> dict: resp = add_confirmation(booking_id) tracer.put_annotation("BookingConfirmation", resp["requestId"]) tracer.put_metadata("Booking confirmation", resp) return resp @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> dict: print("Received event from Lambda...") booking_id = event.get("booking_id") response = confirm_booking(booking_id=booking_id) return response
A Lambda function using service name via POWERTOOLS_SERVICE_NAME
export POWERTOOLS_SERVICE_NAME="booking" from aws_lambda_powertools import Tracer tracer = Tracer() @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> dict: print("Received event from Lambda...") response = greeting(name="Lessa") return response
Reuse an existing instance of Tracer anywhere in the code
# lambda_handler.py from aws_lambda_powertools import Tracer tracer = Tracer() @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> dict: ... # utils.py from aws_lambda_powertools import Tracer tracer = Tracer() ...
Limitations
- Async handler not supported
Expand source code
class Tracer: """Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions When running locally, it detects whether it's running via SAM CLI, and if it is it returns dummy segments/subsegments instead. By default, it patches all available libraries supported by X-Ray SDK. Patching is automatically disabled when running locally via SAM CLI or by any other means. \n Ref: https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/thirdparty.html Tracer keeps a copy of its configuration as it can be instantiated more than once. This is useful when you are using your own middlewares and want to utilize an existing Tracer. Make sure to set `auto_patch=False` in subsequent Tracer instances to avoid double patching. Environment variables --------------------- POWERTOOLS_TRACE_DISABLED : str disable tracer (e.g. `"true", "True", "TRUE"`) POWERTOOLS_SERVICE_NAME : str service name POWERTOOLS_TRACER_CAPTURE_RESPONSE : str disable auto-capture response as metadata (e.g. `"true", "True", "TRUE"`) POWERTOOLS_TRACER_CAPTURE_ERROR : str disable auto-capture error as metadata (e.g. `"true", "True", "TRUE"`) Parameters ---------- service: str Service name that will be appended in all tracing metadata auto_patch: bool Patch existing imported modules during initialization, by default True disabled: bool Flag to explicitly disable tracing, useful when running/testing locally `Env POWERTOOLS_TRACE_DISABLED="true"` patch_modules: Sequence[str] | None Tuple of modules supported by tracing provider to patch, by default all modules are patched provider: BaseProvider Tracing provider, by default it is aws_xray_sdk.core.xray_recorder Returns ------- Tracer Tracer instance with imported modules patched Example ------- **A Lambda function using Tracer** from aws_lambda_powertools import Tracer tracer = Tracer(service="greeting") @tracer.capture_method def greeting(name: str) -> dict: return { "name": name } @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> dict: print("Received event from Lambda...") response = greeting(name="Heitor") return response **Booking Lambda function using Tracer that adds additional annotation/metadata** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method def confirm_booking(booking_id: str) -> dict: resp = add_confirmation(booking_id) tracer.put_annotation("BookingConfirmation", resp["requestId"]) tracer.put_metadata("Booking confirmation", resp) return resp @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> dict: print("Received event from Lambda...") booking_id = event.get("booking_id") response = confirm_booking(booking_id=booking_id) return response **A Lambda function using service name via POWERTOOLS_SERVICE_NAME** export POWERTOOLS_SERVICE_NAME="booking" from aws_lambda_powertools import Tracer tracer = Tracer() @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> dict: print("Received event from Lambda...") response = greeting(name="Lessa") return response **Reuse an existing instance of Tracer anywhere in the code** # lambda_handler.py from aws_lambda_powertools import Tracer tracer = Tracer() @tracer.capture_lambda_handler def handler(event: dict, context: Any) -> dict: ... # utils.py from aws_lambda_powertools import Tracer tracer = Tracer() ... Limitations ----------- * Async handler not supported """ _default_config: dict[str, Any] = { "service": "", "disabled": False, "auto_patch": True, "patch_modules": None, "provider": None, } _config = copy.copy(_default_config) def __init__( self, service: str | None = None, disabled: bool | None = None, auto_patch: bool | None = None, patch_modules: Sequence[str] | None = None, provider: BaseProvider | None = None, ): self.__build_config( service=service, disabled=disabled, auto_patch=auto_patch, patch_modules=patch_modules, provider=provider, ) self.provider: BaseProvider = self._config["provider"] self.disabled = self._config["disabled"] self.service = self._config["service"] self.auto_patch = self._config["auto_patch"] if self.disabled: self._disable_tracer_provider() if self.auto_patch: self.patch(modules=patch_modules) if self._is_xray_provider(): self._disable_xray_trace_batching() def put_annotation(self, key: str, value: str | numbers.Number | bool): """Adds annotation to existing segment or subsegment Parameters ---------- key : str Annotation key value : str | numbers.Number | bool Value for annotation Example ------- Custom annotation for a pseudo service named payment tracer = Tracer(service="payment") tracer.put_annotation("PaymentStatus", "CONFIRMED") """ if self.disabled: logger.debug("Tracing has been disabled, aborting put_annotation") return logger.debug(f"Annotating on key '{key}' with '{value}'") self.provider.put_annotation(key=key, value=value) def put_metadata(self, key: str, value: Any, namespace: str | None = None): """Adds metadata to existing segment or subsegment Parameters ---------- key : str Metadata key value : any Value for metadata namespace : str, optional Namespace that metadata will lie under, by default None Example ------- Custom metadata for a pseudo service named payment tracer = Tracer(service="payment") response = collect_payment() tracer.put_metadata("Payment collection", response) """ if self.disabled: logger.debug("Tracing has been disabled, aborting put_metadata") return namespace = namespace or self.service logger.debug(f"Adding metadata on key '{key}' with '{value}' at namespace '{namespace}'") self.provider.put_metadata(key=key, value=value, namespace=namespace) def patch(self, modules: Sequence[str] | None = None): """Patch modules for instrumentation. Patches all supported modules by default if none are given. Parameters ---------- modules : Sequence[str] | None List of modules to be patched, optional by default """ if self.disabled: logger.debug("Tracing has been disabled, aborting patch") return if modules is None: self.provider.patch_all() else: self.provider.patch(modules) def capture_lambda_handler( self, lambda_handler: Callable[[T, Any], Any] | Callable[[T, Any, Any], Any] | None = None, capture_response: bool | None = None, capture_error: bool | None = None, ): """Decorator to create subsegment for lambda handlers As Lambda follows (event, context) signature we can remove some of the boilerplate and also capture any exception any Lambda function throws or its response as metadata Parameters ---------- lambda_handler : Callable Method to annotate on capture_response : bool, optional Instructs tracer to not include handler's response as metadata capture_error : bool, optional Instructs tracer to not include handler's error as metadata, by default True Example ------- **Lambda function using capture_lambda_handler decorator** tracer = Tracer(service="payment") @tracer.capture_lambda_handler def handler(event, context): ... **Preventing Tracer to log response as metadata** tracer = Tracer(service="payment") @tracer.capture_lambda_handler(capture_response=False) def handler(event, context): ... Raises ------ err Exception raised by method """ # If handler is None we've been called with parameters # Return a partial function with args filled if lambda_handler is None: logger.debug("Decorator called with parameters") return functools.partial( self.capture_lambda_handler, capture_response=capture_response, capture_error=capture_error, ) lambda_handler_name = lambda_handler.__name__ capture_response = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"), choice=capture_response, ) capture_error = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"), choice=capture_error, ) @functools.wraps(lambda_handler) def decorate(event, context, **kwargs): with self.provider.in_subsegment(name=f"## {lambda_handler_name}") as subsegment: try: logger.debug("Calling lambda handler") response = lambda_handler(event, context, **kwargs) logger.debug("Received lambda handler response successfully") self._add_response_as_metadata( method_name=lambda_handler_name, data=response, subsegment=subsegment, capture_response=capture_response, ) except Exception as err: logger.exception(f"Exception received from {lambda_handler_name}") self._add_full_exception_as_metadata( method_name=lambda_handler_name, error=err, subsegment=subsegment, capture_error=capture_error, ) raise finally: global is_cold_start logger.debug("Annotating cold start") subsegment.put_annotation(key="ColdStart", value=is_cold_start) if is_cold_start: is_cold_start = False if self.service: subsegment.put_annotation(key="Service", value=self.service) return response return decorate # see #465 @overload def capture_method(self, method: AnyCallableT) -> AnyCallableT: ... # pragma: no cover @overload def capture_method( self, method: None = None, capture_response: bool | None = None, capture_error: bool | None = None, ) -> Callable[[AnyCallableT], AnyCallableT]: ... # pragma: no cover def capture_method( self, method: AnyCallableT | None = None, capture_response: bool | None = None, capture_error: bool | None = None, ) -> AnyCallableT: """Decorator to create subsegment for arbitrary functions It also captures both response and exceptions as metadata and creates a subsegment named `## <method_module.method_qualifiedname>` # see here: [Qualified name for classes and functions](https://peps.python.org/pep-3155/) When running [async functions concurrently](https://docs.python.org/3/library/asyncio-task.html#id6), methods may impact each others subsegment, and can trigger and AlreadyEndedException from X-Ray due to async nature. For this use case, either use `capture_method` only where `async.gather` is called, or use `in_subsegment_async` context manager via our escape hatch mechanism - See examples. Parameters ---------- method : Callable Method to annotate on capture_response : bool, optional Instructs tracer to not include method's response as metadata capture_error : bool, optional Instructs tracer to not include handler's error as metadata, by default True Example ------- **Custom function using capture_method decorator** tracer = Tracer(service="payment") @tracer.capture_method def some_function() **Custom async method using capture_method decorator** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method async def confirm_booking(booking_id: str) -> dict: resp = call_to_booking_service() tracer.put_annotation("BookingConfirmation", resp["requestId"]) tracer.put_metadata("Booking confirmation", resp) return resp def lambda_handler(event: dict, context: Any) -> dict: booking_id = event.get("booking_id") asyncio.run(confirm_booking(booking_id=booking_id)) **Custom generator function using capture_method decorator** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method def bookings_generator(booking_id): resp = call_to_booking_service() yield resp[0] yield resp[1] def lambda_handler(event: dict, context: Any) -> dict: gen = bookings_generator(booking_id=booking_id) result = list(gen) **Custom generator context manager using capture_method decorator** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method @contextlib.contextmanager def booking_actions(booking_id): resp = call_to_booking_service() yield "example result" cleanup_stuff() def lambda_handler(event: dict, context: Any) -> dict: booking_id = event.get("booking_id") with booking_actions(booking_id=booking_id) as booking: result = booking **Tracing nested async calls** from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method async def get_identity(): ... @tracer.capture_method async def long_async_call(): ... @tracer.capture_method async def async_tasks(): await get_identity() ret = await long_async_call() return { "task": "done", **ret } **Safely tracing concurrent async calls with decorator** This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164) from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") async def get_identity(): async with aioboto3.client("sts") as sts: account = await sts.get_caller_identity() return account async def long_async_call(): ... @tracer.capture_method async def async_tasks(): _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True) return { "task": "done", **ret } **Safely tracing each concurrent async calls with escape hatch** This may not needed once [this bug is closed](https://github.com/aws/aws-xray-sdk-python/issues/164) from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") async def get_identity(): async tracer.provider.in_subsegment_async("## get_identity"): ... async def long_async_call(): async tracer.provider.in_subsegment_async("## long_async_call"): ... @tracer.capture_method async def async_tasks(): _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True) return { "task": "done", **ret } Raises ------ err Exception raised by method """ # If method is None we've been called with parameters # Return a partial function with args filled if method is None: logger.debug("Decorator called with parameters") return cast( AnyCallableT, functools.partial(self.capture_method, capture_response=capture_response, capture_error=capture_error), ) # Example: app.ClassA.get_all # noqa ERA001 # Valid characters can be found at http://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html method_name = sanitize_xray_segment_name(f"{method.__module__}.{method.__qualname__}") capture_response = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_RESPONSE_ENV, "true"), choice=capture_response, ) capture_error = resolve_truthy_env_var_choice( env=os.getenv(constants.TRACER_CAPTURE_ERROR_ENV, "true"), choice=capture_error, ) # Maintenance: Need a factory/builder here to simplify this now if inspect.iscoroutinefunction(method): return self._decorate_async_function( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name, ) elif inspect.isgeneratorfunction(method): return self._decorate_generator_function( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name, ) elif hasattr(method, "__wrapped__") and inspect.isgeneratorfunction(method.__wrapped__): return self._decorate_generator_function_with_context_manager( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name, ) else: return self._decorate_sync_function( method=method, capture_response=capture_response, capture_error=capture_error, method_name=method_name, ) def _decorate_async_function( self, method: Callable, capture_response: bool | str | None = None, capture_error: bool | str | None = None, method_name: str | None = None, ): @functools.wraps(method) async def decorate(*args, **kwargs): async with self.provider.in_subsegment_async(name=f"## {method_name}") as subsegment: try: logger.debug(f"Calling method: {method_name}") response = await method(*args, **kwargs) self._add_response_as_metadata( method_name=method_name, data=response, subsegment=subsegment, capture_response=capture_response, ) except Exception as err: logger.exception(f"Exception received from '{method_name}' method") self._add_full_exception_as_metadata( method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error, ) raise return response return decorate def _decorate_generator_function( self, method: Callable, capture_response: bool | str | None = None, capture_error: bool | str | None = None, method_name: str | None = None, ): @functools.wraps(method) def decorate(*args, **kwargs): with self.provider.in_subsegment(name=f"## {method_name}") as subsegment: try: logger.debug(f"Calling method: {method_name}") result = yield from method(*args, **kwargs) self._add_response_as_metadata( method_name=method_name, data=result, subsegment=subsegment, capture_response=capture_response, ) except Exception as err: logger.exception(f"Exception received from '{method_name}' method") self._add_full_exception_as_metadata( method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error, ) raise return result return decorate def _decorate_generator_function_with_context_manager( self, method: Callable, capture_response: bool | str | None = None, capture_error: bool | str | None = None, method_name: str | None = None, ): @functools.wraps(method) @contextlib.contextmanager def decorate(*args, **kwargs): with self.provider.in_subsegment(name=f"## {method_name}") as subsegment: try: logger.debug(f"Calling method: {method_name}") with method(*args, **kwargs) as return_val: result = return_val yield result self._add_response_as_metadata( method_name=method_name, data=result, subsegment=subsegment, capture_response=capture_response, ) except Exception as err: logger.exception(f"Exception received from '{method_name}' method") self._add_full_exception_as_metadata( method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error, ) raise return decorate def _decorate_sync_function( self, method: AnyCallableT, capture_response: bool | str | None = None, capture_error: bool | str | None = None, method_name: str | None = None, ) -> AnyCallableT: @functools.wraps(method) def decorate(*args, **kwargs): with self.provider.in_subsegment(name=f"## {method_name}") as subsegment: try: logger.debug(f"Calling method: {method_name}") response = method(*args, **kwargs) self._add_response_as_metadata( method_name=method_name, data=response, subsegment=subsegment, capture_response=capture_response, ) except Exception as err: logger.exception(f"Exception received from '{method_name}' method") self._add_full_exception_as_metadata( method_name=method_name, error=err, subsegment=subsegment, capture_error=capture_error, ) raise return response return cast(AnyCallableT, decorate) def _add_response_as_metadata( self, method_name: str | None = None, data: Any | None = None, subsegment: BaseSegment | None = None, capture_response: bool | str | None = None, ): """Add response as metadata for given subsegment Parameters ---------- method_name : str, optional method name to add as metadata key, by default None data : Any, optional data to add as subsegment metadata, by default None subsegment : BaseSegment, optional existing subsegment to add metadata on, by default None capture_response : bool, optional Do not include response as metadata """ if data is None or not capture_response or subsegment is None: return subsegment.put_metadata(key=f"{method_name} response", value=data, namespace=self.service) def _add_full_exception_as_metadata( self, method_name: str, error: Exception, subsegment: BaseSegment, capture_error: bool | None = None, ): """Add full exception object as metadata for given subsegment Parameters ---------- method_name : str method name to add as metadata key, by default None error : Exception error to add as subsegment metadata, by default None subsegment : BaseSegment existing subsegment to add metadata on, by default None capture_error : bool, optional Do not include error as metadata, by default True """ if not capture_error: return subsegment.put_metadata(key=f"{method_name} error", value=error, namespace=self.service) @staticmethod def _disable_tracer_provider(): """Forcefully disables tracing""" logger.debug("Disabling tracer provider...") aws_xray_sdk.global_sdk_config.set_sdk_enabled(False) @staticmethod def _is_tracer_disabled() -> bool | str: """Detects whether trace has been disabled Tracing is automatically disabled in the following conditions: 1. Explicitly disabled via `TRACE_DISABLED` environment variable 2. Running in Lambda Emulators, or locally where X-Ray Daemon will not be listening 3. Explicitly disabled via constructor e.g `Tracer(disabled=True)` Returns ------- bool | str """ logger.debug("Verifying whether Tracing has been disabled") is_lambda_env = os.getenv(constants.LAMBDA_TASK_ROOT_ENV) is_lambda_sam_cli = os.getenv(constants.SAM_LOCAL_ENV) is_chalice_cli = os.getenv(constants.CHALICE_LOCAL_ENV) is_disabled = resolve_truthy_env_var_choice(env=os.getenv(constants.TRACER_DISABLED_ENV, "false")) if is_disabled: logger.debug("Tracing has been disabled via env var POWERTOOLS_TRACE_DISABLED") return is_disabled if not is_lambda_env or (is_lambda_sam_cli or is_chalice_cli): logger.debug("Running outside Lambda env; disabling Tracing") return True return False def __build_config( self, service: str | None = None, disabled: bool | None = None, auto_patch: bool | None = None, patch_modules: Sequence[str] | None = None, provider: BaseProvider | None = None, ): """Populates Tracer config for new and existing initializations""" is_disabled = disabled if disabled is not None else self._is_tracer_disabled() is_service = resolve_env_var_choice(choice=service, env=os.getenv(constants.SERVICE_NAME_ENV)) # Logic: Choose overridden option first, previously cached config, or default if available self._config["provider"] = provider or self._config["provider"] or self._patch_xray_provider() self._config["auto_patch"] = auto_patch if auto_patch is not None else self._config["auto_patch"] self._config["service"] = is_service or self._config["service"] self._config["disabled"] = is_disabled or self._config["disabled"] self._config["patch_modules"] = patch_modules or self._config["patch_modules"] @classmethod def _reset_config(cls): cls._config = copy.copy(cls._default_config) def _patch_xray_provider(self): # Due to Lazy Import, we need to activate `core` attrib via import # we also need to include `patch`, `patch_all` methods # to ensure patch calls are done via the provider from aws_xray_sdk.core import xray_recorder # type: ignore provider = xray_recorder provider.patch = aws_xray_sdk.core.patch provider.patch_all = aws_xray_sdk.core.patch_all return provider def _disable_xray_trace_batching(self): """Configure X-Ray SDK to send subsegment individually over batching Known issue: https://github.com/aws-powertools/powertools-lambda-python/issues/283 """ if self.disabled: logger.debug("Tracing has been disabled, aborting streaming override") return aws_xray_sdk.core.xray_recorder.configure(streaming_threshold=0) def _is_xray_provider(self): return "aws_xray_sdk" in self.provider.__module__ def ignore_endpoint(self, hostname: str | None = None, urls: list[str] | None = None): """If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being requested. > NOTE: If the provider is not xray, nothing will be added to ignore list Documentation -------------- - https://github.com/aws/aws-xray-sdk-python#ignoring-httplib-requests Parameters ---------- hostname : Optional, str The hostname is matched using the Python fnmatch library which does Unix glob style matching. urls: Optional, list[str] List of urls to ignore. Example `tracer.ignore_endpoint(urls=["/ignored-url"])` """ if not self._is_xray_provider(): return from aws_xray_sdk.ext.httplib import add_ignored # type: ignore add_ignored(hostname=hostname, urls=urls)
Methods
def capture_lambda_handler(self, lambda_handler: Callable[[T, Any], Any] | Callable[[T, Any, Any], Any] | None = None, capture_response: bool | None = None, capture_error: bool | None = None)
-
Decorator to create subsegment for lambda handlers
As Lambda follows (event, context) signature we can remove some of the boilerplate and also capture any exception any Lambda function throws or its response as metadata
Parameters
lambda_handler
:Callable
- Method to annotate on
capture_response
:bool
, optional- Instructs tracer to not include handler's response as metadata
capture_error
:bool
, optional- Instructs tracer to not include handler's error as metadata, by default True
Example
Lambda function using capture_lambda_handler decorator
tracer = Tracer(service="payment") @tracer.capture_lambda_handler def handler(event, context): ...
Preventing Tracer to log response as metadata
tracer = Tracer(service="payment") @tracer.capture_lambda_handler(capture_response=False) def handler(event, context): ...
Raises
err
- Exception raised by method
def capture_method(self, method: AnyCallableT | None = None, capture_response: bool | None = None, capture_error: bool | None = None) ‑> ~AnyCallableT
-
Decorator to create subsegment for arbitrary functions
It also captures both response and exceptions as metadata and creates a subsegment named
## <method_module.method_qualifiedname>
see here: Qualified name for classes and functions
When running async functions concurrently, methods may impact each others subsegment, and can trigger and AlreadyEndedException from X-Ray due to async nature.
For this use case, either use
capture_method
only whereasync.gather
is called, or usein_subsegment_async
context manager via our escape hatch mechanism - See examples.Parameters
method
:Callable
- Method to annotate on
capture_response
:bool
, optional- Instructs tracer to not include method's response as metadata
capture_error
:bool
, optional- Instructs tracer to not include handler's error as metadata, by default True
Example
Custom function using capture_method decorator
tracer = Tracer(service="payment") @tracer.capture_method def some_function()
Custom async method using capture_method decorator
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method async def confirm_booking(booking_id: str) -> dict: resp = call_to_booking_service() tracer.put_annotation("BookingConfirmation", resp["requestId"]) tracer.put_metadata("Booking confirmation", resp) return resp def lambda_handler(event: dict, context: Any) -> dict: booking_id = event.get("booking_id") asyncio.run(confirm_booking(booking_id=booking_id))
Custom generator function using capture_method decorator
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method def bookings_generator(booking_id): resp = call_to_booking_service() yield resp[0] yield resp[1] def lambda_handler(event: dict, context: Any) -> dict: gen = bookings_generator(booking_id=booking_id) result = list(gen)
Custom generator context manager using capture_method decorator
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method @contextlib.contextmanager def booking_actions(booking_id): resp = call_to_booking_service() yield "example result" cleanup_stuff() def lambda_handler(event: dict, context: Any) -> dict: booking_id = event.get("booking_id") with booking_actions(booking_id=booking_id) as booking: result = booking
Tracing nested async calls
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") @tracer.capture_method async def get_identity(): ... @tracer.capture_method async def long_async_call(): ... @tracer.capture_method async def async_tasks(): await get_identity() ret = await long_async_call() return { "task": "done", **ret }
Safely tracing concurrent async calls with decorator
This may not needed once this bug is closed
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") async def get_identity(): async with aioboto3.client("sts") as sts: account = await sts.get_caller_identity() return account async def long_async_call(): ... @tracer.capture_method async def async_tasks(): _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True) return { "task": "done", **ret }
Safely tracing each concurrent async calls with escape hatch
This may not needed once this bug is closed
from aws_lambda_powertools import Tracer tracer = Tracer(service="booking") async def get_identity(): async tracer.provider.in_subsegment_async("## get_identity"): ... async def long_async_call(): async tracer.provider.in_subsegment_async("## long_async_call"): ... @tracer.capture_method async def async_tasks(): _, ret = await asyncio.gather(get_identity(), long_async_call(), return_exceptions=True) return { "task": "done", **ret }
Raises
err
- Exception raised by method
def ignore_endpoint(self, hostname: str | None = None, urls: list[str] | None = None)
-
If you want to ignore certain httplib requests you can do so based on the hostname or URL that is being requested.
NOTE: If the provider is not xray, nothing will be added to ignore list
Documentation
Parameters
hostname
:Optional, str
- The hostname is matched using the Python fnmatch library which does Unix glob style matching.
urls
:Optional, list[str]
- List of urls to ignore. Example
tracer.ignore_endpoint(urls=["/ignored-url"])
def patch(self, modules: Sequence[str] | None = None)
-
Patch modules for instrumentation.
Patches all supported modules by default if none are given.
Parameters
modules
:Sequence[str] | None
- List of modules to be patched, optional by default
def put_annotation(self, key: str, value: str | numbers.Number | bool)
-
Adds annotation to existing segment or subsegment
Parameters
key
:str
- Annotation key
value
:str | numbers.Number | bool
- Value for annotation
Example
Custom annotation for a pseudo service named payment
tracer = Tracer(service="payment") tracer.put_annotation("PaymentStatus", "CONFIRMED")
def put_metadata(self, key: str, value: Any, namespace: str | None = None)
-
Adds metadata to existing segment or subsegment
Parameters
key
:str
- Metadata key
value
:any
- Value for metadata
namespace
:str
, optional- Namespace that metadata will lie under, by default None
Example
Custom metadata for a pseudo service named payment
tracer = Tracer(service="payment") response = collect_payment() tracer.put_metadata("Payment collection", response)