Module aws_lambda_powertools.metrics.provider.datadog

Sub-modules

aws_lambda_powertools.metrics.provider.datadog.datadog
aws_lambda_powertools.metrics.provider.datadog.metrics
aws_lambda_powertools.metrics.provider.datadog.warnings

Classes

class DatadogMetrics (namespace: str | None = None, flush_to_log: bool | None = None, provider: DatadogProvider | None = None)

DatadogProvider creates metrics asynchronously via Datadog extension or exporter.

Use aws_lambda_powertools.DatadogMetrics to create and metrics to Datadog.

Example

Creates a few metrics and publish at the end of a function execution

from aws_lambda_powertools.metrics.provider.datadog import DatadogMetrics

metrics = DatadogMetrics(namespace="ServerlessAirline")

@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler():
    metrics.add_metric(name="item_sold", value=1, product="latte", order="online")
    return True

Environment Variables

POWERTOOLS_METRICS_NAMESPACE : str metric namespace

Parameters

flush_to_log : bool, optional
Used when using export instead of Lambda Extension
namespace : str, optional
Namespace for metrics
provider : DatadogProvider, optional
Pre-configured DatadogProvider provider

Raises

MetricValueError
When metric value isn't a number
SchemaValidationError
When metric object fails Datadog schema validation
Expand source code
class DatadogMetrics:
    """
    DatadogProvider creates metrics asynchronously via Datadog extension or exporter.

    **Use `aws_lambda_powertools.DatadogMetrics` to create and metrics to Datadog.**

    Example
    -------
    **Creates a few metrics and publish at the end of a function execution**

        from aws_lambda_powertools.metrics.provider.datadog import DatadogMetrics

        metrics = DatadogMetrics(namespace="ServerlessAirline")

        @metrics.log_metrics(capture_cold_start_metric=True)
        def lambda_handler():
            metrics.add_metric(name="item_sold", value=1, product="latte", order="online")
            return True

    Environment variables
    ---------------------
    POWERTOOLS_METRICS_NAMESPACE : str
        metric namespace

    Parameters
    ----------
    flush_to_log : bool, optional
        Used when using export instead of Lambda Extension
    namespace : str, optional
        Namespace for metrics
    provider: DatadogProvider, optional
        Pre-configured DatadogProvider provider

    Raises
    ------
    MetricValueError
        When metric value isn't a number
    SchemaValidationError
        When metric object fails Datadog schema validation
    """

    # NOTE: We use class attrs to share metrics data across instances
    # this allows customers to initialize Metrics() throughout their code base (and middlewares)
    # and not get caught by accident with metrics data loss, or data deduplication
    # e.g., m1 and m2 add metric ProductCreated, however m1 has 'version' dimension  but m2 doesn't
    # Result: ProductCreated is created twice as we now have 2 different EMF blobs
    _metrics: List = []
    _default_tags: Dict[str, Any] = {}

    def __init__(
        self,
        namespace: str | None = None,
        flush_to_log: bool | None = None,
        provider: DatadogProvider | None = None,
    ):
        self.metric_set = self._metrics
        self.default_tags = self._default_tags

        if provider is None:
            self.provider = DatadogProvider(
                namespace=namespace,
                flush_to_log=flush_to_log,
                metric_set=self.metric_set,
            )
        else:
            self.provider = provider

    def add_metric(
        self,
        name: str,
        value: float,
        timestamp: int | None = None,
        **tags: Any,
    ) -> None:
        self.provider.add_metric(name=name, value=value, timestamp=timestamp, **tags)

    def serialize_metric_set(self, metrics: List | None = None) -> List:
        return self.provider.serialize_metric_set(metrics=metrics)

    def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None:
        self.provider.flush_metrics(raise_on_empty_metrics=raise_on_empty_metrics)

    def log_metrics(
        self,
        lambda_handler: AnyCallableT | None = None,
        capture_cold_start_metric: bool = False,
        raise_on_empty_metrics: bool = False,
        default_tags: Dict[str, Any] | None = None,
    ):
        return self.provider.log_metrics(
            lambda_handler=lambda_handler,
            capture_cold_start_metric=capture_cold_start_metric,
            raise_on_empty_metrics=raise_on_empty_metrics,
            default_tags=default_tags,
        )

    def set_default_tags(self, **tags) -> None:
        self.provider.set_default_tags(**tags)
        self.default_tags.update(**tags)

    def clear_metrics(self) -> None:
        self.provider.clear_metrics()

    def clear_default_tags(self) -> None:
        self.provider.default_tags.clear()
        self.default_tags.clear()

    # We now allow customers to bring their own instance
    # of the DatadogProvider provider
    # So we need to define getter/setter for namespace property
    # To access this attribute on the provider instance.
    @property
    def namespace(self):
        return self.provider.namespace

    @namespace.setter
    def namespace(self, namespace):
        self.provider.namespace = namespace

Instance variables

prop namespace
Expand source code
@property
def namespace(self):
    return self.provider.namespace

Methods

def add_metric(self, name: str, value: float, timestamp: int | None = None, **tags: Any) ‑> None
def clear_default_tags(self) ‑> None
def clear_metrics(self) ‑> None
def flush_metrics(self, raise_on_empty_metrics: bool = False) ‑> None
def log_metrics(self, lambda_handler: AnyCallableT | None = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, default_tags: Dict[str, Any] | None = None)
def serialize_metric_set(self, metrics: List | None = None) ‑> List
def set_default_tags(self, **tags) ‑> None
class DatadogProvider (metric_set: List | None = None, namespace: str | None = None, flush_to_log: bool | None = None, default_tags: Dict[str, Any] | None = None)

DatadogProvider creates metrics asynchronously via Datadog extension or exporter.

Use aws_lambda_powertools.DatadogMetrics to create and metrics to Datadog.

Environment Variables

POWERTOOLS_METRICS_NAMESPACE : str metric namespace to be set for all metrics

Raises

MetricValueError
When metric value isn't a number
SchemaValidationError
When metric object fails EMF schema validation
Expand source code
class DatadogProvider(BaseProvider):
    """
    DatadogProvider creates metrics asynchronously via Datadog extension or exporter.

    **Use `aws_lambda_powertools.DatadogMetrics` to create and metrics to Datadog.**

    Environment variables
    ---------------------
    POWERTOOLS_METRICS_NAMESPACE : str
        metric namespace to be set for all metrics

    Raises
    ------
    MetricValueError
        When metric value isn't a number
    SchemaValidationError
        When metric object fails EMF schema validation
    """

    def __init__(
        self,
        metric_set: List | None = None,
        namespace: str | None = None,
        flush_to_log: bool | None = None,
        default_tags: Dict[str, Any] | None = None,
    ):
        self.metric_set = metric_set if metric_set is not None else []
        self.namespace = (
            resolve_env_var_choice(choice=namespace, env=os.getenv(constants.METRICS_NAMESPACE_ENV))
            or DEFAULT_NAMESPACE
        )
        self.default_tags = default_tags or {}
        self.flush_to_log = resolve_env_var_choice(choice=flush_to_log, env=os.getenv(constants.DATADOG_FLUSH_TO_LOG))

    #  adding name,value,timestamp,tags
    def add_metric(
        self,
        name: str,
        value: float,
        timestamp: int | None = None,
        **tags,
    ) -> None:
        """
        The add_metrics function that will be used by metrics class.

        Parameters
        ----------
        name: str
            Name/Key for the metrics
        value: float
            Value for the metrics
        timestamp: int
            Timestamp in int for the metrics, default = time.time()
        tags: List[str]
            In format like List["tag:value","tag2:value2"]
        args: Any
            extra args will be dropped for compatibility
        kwargs: Any
            extra kwargs will be converted into tags, e.g., add_metrics(sales=sam) -> tags=['sales:sam']

        Examples
        --------
            >>> provider = DatadogProvider()
            >>>
            >>> provider.add_metric(
            >>>     name='coffee_house.order_value',
            >>>     value=12.45,
            >>>     tags=['product:latte', 'order:online'],
            >>>     sales='sam'
            >>> )
        """

        # validating metric name
        if not self._validate_datadog_metric_name(name):
            docs = "https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics"
            raise SchemaValidationError(
                f"Invalid metric name. Please ensure the metric {name} follows the requirements. \n"
                f"See Datadog documentation here: \n {docs}",
            )

        # validating metric tag
        self._validate_datadog_tags_name(tags)

        if not isinstance(value, numbers.Real):
            raise MetricValueError(f"{value} is not a valid number")

        if not timestamp:
            timestamp = int(time.time())

        logger.debug({"details": "Appending metric", "metrics": name})
        self.metric_set.append({"m": name, "v": value, "e": timestamp, "t": tags})

    def serialize_metric_set(self, metrics: List | None = None) -> List:
        """Serializes metrics

        Example
        -------
        **Serialize metrics into Datadog format**

            metrics = DatadogMetric()
            # ...add metrics, tags, namespace
            ret = metrics.serialize_metric_set()

        Returns
        -------
        List
            Serialized metrics following Datadog specification

        Raises
        ------
        SchemaValidationError
            Raised when serialization fail schema validation
        """

        if metrics is None:  # pragma: no cover
            metrics = self.metric_set

        if len(metrics) == 0:
            raise SchemaValidationError("Must contain at least one metric.")

        output_list: List = []

        logger.debug({"details": "Serializing metrics", "metrics": metrics})

        for single_metric in metrics:
            if self.namespace != DEFAULT_NAMESPACE:
                metric_name = f"{self.namespace}.{single_metric['m']}"
            else:
                metric_name = single_metric["m"]

            output_list.append(
                {
                    "m": metric_name,
                    "v": single_metric["v"],
                    "e": single_metric["e"],
                    "t": self._serialize_datadog_tags(metric_tags=single_metric["t"], default_tags=self.default_tags),
                },
            )

        return output_list

    # flush serialized data to output
    def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None:
        """Manually flushes the metrics. This is normally not necessary,
        unless you're running on other runtimes besides Lambda, where the @log_metrics
        decorator already handles things for you.

        Parameters
        ----------
        raise_on_empty_metrics : bool, optional
            raise exception if no metrics are emitted, by default False
        """
        if not raise_on_empty_metrics and len(self.metric_set) == 0:
            warnings.warn(
                "No application metrics to publish. The cold-start metric may be published if enabled. "
                "If application metrics should never be empty, consider using 'raise_on_empty_metrics'",
                stacklevel=2,
            )

        else:
            logger.debug("Flushing existing metrics")
            metrics = self.serialize_metric_set()
            # submit through datadog extension
            if lambda_metric and not self.flush_to_log:
                # use lambda_metric function from datadog package, submit metrics to datadog
                for metric_item in metrics:  # pragma: no cover
                    lambda_metric(  # pragma: no cover
                        metric_name=metric_item["m"],
                        value=metric_item["v"],
                        timestamp=metric_item["e"],
                        tags=metric_item["t"],
                    )
            else:
                # dd module not found: flush to log, this format can be recognized via datadog log forwarder
                # https://github.com/Datadog/datadog-lambda-python/blob/main/datadog_lambda/metric.py#L77
                for metric_item in metrics:
                    print(json.dumps(metric_item, separators=(",", ":")))

            self.clear_metrics()

    def clear_metrics(self):
        logger.debug("Clearing out existing metric set from memory")
        self.metric_set.clear()

    def add_cold_start_metric(self, context: LambdaContext) -> None:
        """Add cold start metric and function_name dimension

        Parameters
        ----------
        context : Any
            Lambda context
        """
        logger.debug("Adding cold start metric and function_name tagging")
        self.add_metric(name="ColdStart", value=1, function_name=context.function_name)

    def log_metrics(
        self,
        lambda_handler: AnyCallableT | None = None,
        capture_cold_start_metric: bool = False,
        raise_on_empty_metrics: bool = False,
        **kwargs,
    ):
        """Decorator to serialize and publish metrics at the end of a function execution.

        Be aware that the log_metrics **does call* the decorated function (e.g. lambda_handler).

        Example
        -------
        **Lambda function using tracer and metrics decorators**

            from aws_lambda_powertools import Tracer
            from aws_lambda_powertools.metrics.provider.datadog import DatadogMetrics

            metrics = DatadogMetrics(namespace="powertools")
            tracer = Tracer(service="payment")

            @tracer.capture_lambda_handler
            @metrics.log_metrics
            def handler(event, context):
                    ...

        Parameters
        ----------
        lambda_handler : Callable[[Any, Any], Any], optional
            lambda function handler, by default None
        capture_cold_start_metric : bool, optional
            captures cold start metric, by default False
        raise_on_empty_metrics : bool, optional
            raise exception if no metrics are emitted, by default False
        **kwargs

        Raises
        ------
        e
            Propagate error received
        """

        default_tags = kwargs.get("default_tags")

        if default_tags:
            self.set_default_tags(**default_tags)

        return super().log_metrics(
            lambda_handler=lambda_handler,
            capture_cold_start_metric=capture_cold_start_metric,
            raise_on_empty_metrics=raise_on_empty_metrics,
            **kwargs,
        )

    def set_default_tags(self, **tags) -> None:
        """Persist tags across Lambda invocations

        Parameters
        ----------
        tags : **kwargs
            tags as key=value

        Example
        -------
        **Sets some default dimensions that will always be present across metrics and invocations**

            from aws_lambda_powertools import Metrics

            metrics = Metrics(namespace="ServerlessAirline", service="payment")
            metrics.set_default_tags(environment="demo", another="one")

            @metrics.log_metrics()
            def lambda_handler():
                return True
        """
        self._validate_datadog_tags_name(tags)
        self.default_tags.update(**tags)

    @staticmethod
    def _serialize_datadog_tags(metric_tags: Dict[str, Any], default_tags: Dict[str, Any]) -> List[str]:
        """
        Serialize metric tags into a list of formatted strings for Datadog integration.

        This function takes a dictionary of metric-specific tags or default tags.
        It parse these tags and converts them into a list of strings in the format "tag_key:tag_value".

        Parameters
        ----------
        metric_tags: Dict[str, Any]
            A dictionary containing metric-specific tags.
        default_tags: Dict[str, Any]
            A dictionary containing default tags applicable to all metrics.

        Returns:
        -------
        List[str]
            A list of formatted tag strings, each in the "tag_key:tag_value" format.

        Example:
            >>> metric_tags = {'environment': 'production', 'service': 'web'}
            >>> serialize_datadog_tags(metric_tags, None)
            ['environment:production', 'service:web']
        """

        # We need to create a new dictionary by combining default_tags first,
        # and then metric_tags on top of it. This ensures that the keys from metric_tags take precedence
        # and replace corresponding keys in default_tags.
        tags = {**default_tags, **metric_tags}

        return [f"{tag_key}:{tag_value}" for tag_key, tag_value in tags.items()]

    @staticmethod
    def _validate_datadog_tags_name(tags: Dict):
        """
        Validate a metric tag according to specific requirements.

        Metric tags must start with a letter.
        Metric tags must not exceed 200 characters. Fewer than 100 is preferred from a UI perspective.

        More information here: https://docs.datadoghq.com/getting_started/tagging/#define-tags

        Parameters:
        ----------
        tags: Dict
            The metric tags to be validated.
        """
        for tag_key, tag_value in tags.items():
            tag = f"{tag_key}:{tag_value}"
            if not tag[0].isalpha() or len(tag) > 200:
                docs = "https://docs.datadoghq.com/getting_started/tagging/#define-tags"
                warnings.warn(
                    f"Invalid tag value. Please ensure the specific tag {tag} follows the requirements. \n"
                    f"May incur data loss for metrics. \n"
                    f"See Datadog documentation here: \n {docs}",
                    DatadogDataValidationWarning,
                    stacklevel=2,
                )

    @staticmethod
    def _validate_datadog_metric_name(metric_name: str) -> bool:
        """
        Validate a metric name according to specific requirements.

        Metric names must start with a letter.
        Metric names must only contain ASCII alphanumerics, underscores, and periods.
        Other characters, including spaces, are converted to underscores.
        Unicode is not supported.
        Metric names must not exceed 200 characters. Fewer than 100 is preferred from a UI perspective.

        More information here: https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics

        Parameters:
        ----------
        metric_name: str
            The metric name to be validated.

        Returns:
        -------
        bool
            True if the metric name is valid, False otherwise.
        """

        # Check if the metric name starts with a letter
        # Check if the metric name contains more than 200 characters
        # Check if the resulting metric name only contains ASCII alphanumerics, underscores, and periods
        if not metric_name[0].isalpha() or len(metric_name) > 200 or not METRIC_NAME_REGEX.match(metric_name):
            return False

        return True

Ancestors

Methods

def add_cold_start_metric(self, context: LambdaContext) ‑> None

Add cold start metric and function_name dimension

Parameters

context : Any
Lambda context
def add_metric(self, name: str, value: float, timestamp: int | None = None, **tags) ‑> None

The add_metrics function that will be used by metrics class.

Parameters

name : str
Name/Key for the metrics
value : float
Value for the metrics
timestamp : int
Timestamp in int for the metrics, default = time.time()
tags : List[str]
In format like List["tag:value","tag2:value2"]
args : Any
extra args will be dropped for compatibility
kwargs : Any
extra kwargs will be converted into tags, e.g., add_metrics(sales=sam) -> tags=['sales:sam']

Examples

>>> provider = DatadogProvider()
>>>
>>> provider.add_metric(
>>>     name='coffee_house.order_value',
>>>     value=12.45,
>>>     tags=['product:latte', 'order:online'],
>>>     sales='sam'
>>> )
def flush_metrics(self, raise_on_empty_metrics: bool = False) ‑> None

Manually flushes the metrics. This is normally not necessary, unless you're running on other runtimes besides Lambda, where the @log_metrics decorator already handles things for you.

Parameters

raise_on_empty_metrics : bool, optional
raise exception if no metrics are emitted, by default False
def log_metrics(self, lambda_handler: AnyCallableT | None = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, **kwargs)

Decorator to serialize and publish metrics at the end of a function execution.

Be aware that the log_metrics *does call the decorated function (e.g. lambda_handler).

Example

Lambda function using tracer and metrics decorators

from aws_lambda_powertools import Tracer
from aws_lambda_powertools.metrics.provider.datadog import DatadogMetrics

metrics = DatadogMetrics(namespace="powertools")
tracer = Tracer(service="payment")

@tracer.capture_lambda_handler
@metrics.log_metrics
def handler(event, context):
        ...

Parameters

lambda_handler : Callable[[Any, Any], Any], optional
lambda function handler, by default None
capture_cold_start_metric : bool, optional
captures cold start metric, by default False
raise_on_empty_metrics : bool, optional
raise exception if no metrics are emitted, by default False
**kwargs
 

Raises

e
Propagate error received
def serialize_metric_set(self, metrics: List | None = None) ‑> List

Serializes metrics

Example

Serialize metrics into Datadog format

metrics = DatadogMetric()
# ...add metrics, tags, namespace
ret = metrics.serialize_metric_set()

Returns

List
Serialized metrics following Datadog specification

Raises

SchemaValidationError
Raised when serialization fail schema validation
def set_default_tags(self, **tags) ‑> None

Persist tags across Lambda invocations

Parameters

tags : **kwargs
tags as key=value

Example

Sets some default dimensions that will always be present across metrics and invocations

from aws_lambda_powertools import Metrics

metrics = Metrics(namespace="ServerlessAirline", service="payment")
metrics.set_default_tags(environment="demo", another="one")

@metrics.log_metrics()
def lambda_handler():
    return True

Inherited members