Module aws_lambda_powertools.metrics.provider.datadog.metrics
Expand source code
# NOTE: keeps for compatibility
from __future__ import annotations
from typing import Any, Callable, Dict, List, Optional
from aws_lambda_powertools.metrics.provider.datadog.datadog import DatadogProvider
class DatadogMetrics:
"""
DatadogProvider creates metrics asynchronously via Datadog extension or exporter.
**Use `aws_lambda_powertools.DatadogMetrics` to create and metrics to Datadog.**
Example
-------
**Creates a few metrics and publish at the end of a function execution**
from aws_lambda_powertools.metrics.provider.datadog import DatadogMetrics
metrics = DatadogMetrics(namespace="ServerlessAirline")
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler():
metrics.add_metric(name="item_sold", value=1, product="latte", order="online")
return True
Environment variables
---------------------
POWERTOOLS_METRICS_NAMESPACE : str
metric namespace
Parameters
----------
flush_to_log : bool, optional
Used when using export instead of Lambda Extension
namespace : str, optional
Namespace for metrics
provider: DatadogProvider, optional
Pre-configured DatadogProvider provider
Raises
------
MetricValueError
When metric value isn't a number
SchemaValidationError
When metric object fails Datadog schema validation
"""
# NOTE: We use class attrs to share metrics data across instances
# this allows customers to initialize Metrics() throughout their code base (and middlewares)
# and not get caught by accident with metrics data loss, or data deduplication
# e.g., m1 and m2 add metric ProductCreated, however m1 has 'version' dimension but m2 doesn't
# Result: ProductCreated is created twice as we now have 2 different EMF blobs
_metrics: List = []
_default_tags: Dict[str, Any] = {}
def __init__(
self,
namespace: str | None = None,
flush_to_log: bool | None = None,
provider: DatadogProvider | None = None,
):
self.metric_set = self._metrics
self.default_tags = self._default_tags
if provider is None:
self.provider = DatadogProvider(
namespace=namespace,
flush_to_log=flush_to_log,
metric_set=self.metric_set,
)
else:
self.provider = provider
def add_metric(
self,
name: str,
value: float,
timestamp: int | None = None,
**tags: Any,
) -> None:
self.provider.add_metric(name=name, value=value, timestamp=timestamp, **tags)
def serialize_metric_set(self, metrics: List | None = None) -> List:
return self.provider.serialize_metric_set(metrics=metrics)
def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None:
self.provider.flush_metrics(raise_on_empty_metrics=raise_on_empty_metrics)
def log_metrics(
self,
lambda_handler: Callable[[Dict, Any], Any] | Optional[Callable[[Dict, Any, Optional[Dict]], Any]] = None,
capture_cold_start_metric: bool = False,
raise_on_empty_metrics: bool = False,
default_tags: Dict[str, Any] | None = None,
):
return self.provider.log_metrics(
lambda_handler=lambda_handler,
capture_cold_start_metric=capture_cold_start_metric,
raise_on_empty_metrics=raise_on_empty_metrics,
default_tags=default_tags,
)
def set_default_tags(self, **tags) -> None:
self.provider.set_default_tags(**tags)
self.default_tags.update(**tags)
def clear_metrics(self) -> None:
self.provider.clear_metrics()
def clear_default_tags(self) -> None:
self.provider.default_tags.clear()
self.default_tags.clear()
# We now allow customers to bring their own instance
# of the DatadogProvider provider
# So we need to define getter/setter for namespace property
# To access this attribute on the provider instance.
@property
def namespace(self):
return self.provider.namespace
@namespace.setter
def namespace(self, namespace):
self.provider.namespace = namespace
Classes
class DatadogMetrics (namespace: str | None = None, flush_to_log: bool | None = None, provider: DatadogProvider | None = None)
-
DatadogProvider creates metrics asynchronously via Datadog extension or exporter.
Use
aws_lambda_powertools.DatadogMetrics
to create and metrics to Datadog.Example
Creates a few metrics and publish at the end of a function execution
from aws_lambda_powertools.metrics.provider.datadog import DatadogMetrics metrics = DatadogMetrics(namespace="ServerlessAirline") @metrics.log_metrics(capture_cold_start_metric=True) def lambda_handler(): metrics.add_metric(name="item_sold", value=1, product="latte", order="online") return True
Environment Variables
POWERTOOLS_METRICS_NAMESPACE : str metric namespace
Parameters
flush_to_log
:bool
, optional- Used when using export instead of Lambda Extension
namespace
:str
, optional- Namespace for metrics
provider
:DatadogProvider
, optional- Pre-configured DatadogProvider provider
Raises
MetricValueError
- When metric value isn't a number
SchemaValidationError
- When metric object fails Datadog schema validation
Expand source code
class DatadogMetrics: """ DatadogProvider creates metrics asynchronously via Datadog extension or exporter. **Use `aws_lambda_powertools.DatadogMetrics` to create and metrics to Datadog.** Example ------- **Creates a few metrics and publish at the end of a function execution** from aws_lambda_powertools.metrics.provider.datadog import DatadogMetrics metrics = DatadogMetrics(namespace="ServerlessAirline") @metrics.log_metrics(capture_cold_start_metric=True) def lambda_handler(): metrics.add_metric(name="item_sold", value=1, product="latte", order="online") return True Environment variables --------------------- POWERTOOLS_METRICS_NAMESPACE : str metric namespace Parameters ---------- flush_to_log : bool, optional Used when using export instead of Lambda Extension namespace : str, optional Namespace for metrics provider: DatadogProvider, optional Pre-configured DatadogProvider provider Raises ------ MetricValueError When metric value isn't a number SchemaValidationError When metric object fails Datadog schema validation """ # NOTE: We use class attrs to share metrics data across instances # this allows customers to initialize Metrics() throughout their code base (and middlewares) # and not get caught by accident with metrics data loss, or data deduplication # e.g., m1 and m2 add metric ProductCreated, however m1 has 'version' dimension but m2 doesn't # Result: ProductCreated is created twice as we now have 2 different EMF blobs _metrics: List = [] _default_tags: Dict[str, Any] = {} def __init__( self, namespace: str | None = None, flush_to_log: bool | None = None, provider: DatadogProvider | None = None, ): self.metric_set = self._metrics self.default_tags = self._default_tags if provider is None: self.provider = DatadogProvider( namespace=namespace, flush_to_log=flush_to_log, metric_set=self.metric_set, ) else: self.provider = provider def add_metric( self, name: str, value: float, timestamp: int | None = None, **tags: Any, ) -> None: self.provider.add_metric(name=name, value=value, timestamp=timestamp, **tags) def serialize_metric_set(self, metrics: List | None = None) -> List: return self.provider.serialize_metric_set(metrics=metrics) def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None: self.provider.flush_metrics(raise_on_empty_metrics=raise_on_empty_metrics) def log_metrics( self, lambda_handler: Callable[[Dict, Any], Any] | Optional[Callable[[Dict, Any, Optional[Dict]], Any]] = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, default_tags: Dict[str, Any] | None = None, ): return self.provider.log_metrics( lambda_handler=lambda_handler, capture_cold_start_metric=capture_cold_start_metric, raise_on_empty_metrics=raise_on_empty_metrics, default_tags=default_tags, ) def set_default_tags(self, **tags) -> None: self.provider.set_default_tags(**tags) self.default_tags.update(**tags) def clear_metrics(self) -> None: self.provider.clear_metrics() def clear_default_tags(self) -> None: self.provider.default_tags.clear() self.default_tags.clear() # We now allow customers to bring their own instance # of the DatadogProvider provider # So we need to define getter/setter for namespace property # To access this attribute on the provider instance. @property def namespace(self): return self.provider.namespace @namespace.setter def namespace(self, namespace): self.provider.namespace = namespace
Instance variables
var namespace
-
Expand source code
@property def namespace(self): return self.provider.namespace
Methods
def add_metric(self, name: str, value: float, timestamp: int | None = None, **tags: Any) ‑> None
-
Expand source code
def add_metric( self, name: str, value: float, timestamp: int | None = None, **tags: Any, ) -> None: self.provider.add_metric(name=name, value=value, timestamp=timestamp, **tags)
-
Expand source code
def clear_default_tags(self) -> None: self.provider.default_tags.clear() self.default_tags.clear()
def clear_metrics(self) ‑> None
-
Expand source code
def clear_metrics(self) -> None: self.provider.clear_metrics()
def flush_metrics(self, raise_on_empty_metrics: bool = False) ‑> None
-
Expand source code
def flush_metrics(self, raise_on_empty_metrics: bool = False) -> None: self.provider.flush_metrics(raise_on_empty_metrics=raise_on_empty_metrics)
def log_metrics(self, lambda_handler: Callable[[Dict, Any], Any] | Optional[Callable[[Dict, Any, Optional[Dict]], Any]] = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, default_tags: Dict[str, Any] | None = None)
-
Expand source code
def log_metrics( self, lambda_handler: Callable[[Dict, Any], Any] | Optional[Callable[[Dict, Any, Optional[Dict]], Any]] = None, capture_cold_start_metric: bool = False, raise_on_empty_metrics: bool = False, default_tags: Dict[str, Any] | None = None, ): return self.provider.log_metrics( lambda_handler=lambda_handler, capture_cold_start_metric=capture_cold_start_metric, raise_on_empty_metrics=raise_on_empty_metrics, default_tags=default_tags, )
def serialize_metric_set(self, metrics: List | None = None) ‑> List
-
Expand source code
def serialize_metric_set(self, metrics: List | None = None) -> List: return self.provider.serialize_metric_set(metrics=metrics)
-
Expand source code
def set_default_tags(self, **tags) -> None: self.provider.set_default_tags(**tags) self.default_tags.update(**tags)