Module aws_lambda_powertools.utilities.feature_toggles.schema

Expand source code
from enum import Enum
from logging import Logger
from typing import Any, Dict

from .exceptions import ConfigurationError

FEATURES_KEY = "features"
RULES_KEY = "rules"
FEATURE_DEFAULT_VAL_KEY = "feature_default_value"
CONDITIONS_KEY = "conditions"
RULE_NAME_KEY = "rule_name"
RULE_DEFAULT_VALUE = "value_when_applies"
CONDITION_KEY = "key"
CONDITION_VALUE = "value"
CONDITION_ACTION = "action"


class ACTION(str, Enum):
    EQUALS = "EQUALS"
    STARTSWITH = "STARTSWITH"
    ENDSWITH = "ENDSWITH"
    CONTAINS = "CONTAINS"


class SchemaValidator:
    def __init__(self, logger: Logger):
        self._logger = logger

    def _raise_conf_exc(self, error_str: str) -> None:
        self._logger.error(error_str)
        raise ConfigurationError(error_str)

    def _validate_condition(self, rule_name: str, condition: Dict[str, str]) -> None:
        if not condition or not isinstance(condition, dict):
            self._raise_conf_exc(f"invalid condition type, not a dictionary, rule_name={rule_name}")
        action = condition.get(CONDITION_ACTION, "")
        if action not in [ACTION.EQUALS.value, ACTION.STARTSWITH.value, ACTION.ENDSWITH.value, ACTION.CONTAINS.value]:
            self._raise_conf_exc(f"invalid action value, rule_name={rule_name}, action={action}")
        key = condition.get(CONDITION_KEY, "")
        if not key or not isinstance(key, str):
            self._raise_conf_exc(f"invalid key value, key has to be a non empty string, rule_name={rule_name}")
        value = condition.get(CONDITION_VALUE, "")
        if not value:
            self._raise_conf_exc(f"missing condition value, rule_name={rule_name}")

    def _validate_rule(self, feature_name: str, rule: Dict[str, Any]) -> None:
        if not rule or not isinstance(rule, dict):
            self._raise_conf_exc(f"feature rule is not a dictionary, feature_name={feature_name}")
        rule_name = rule.get(RULE_NAME_KEY)
        if not rule_name or rule_name is None or not isinstance(rule_name, str):
            return self._raise_conf_exc(f"invalid rule_name, feature_name={feature_name}")
        rule_default_value = rule.get(RULE_DEFAULT_VALUE)
        if rule_default_value is None or not isinstance(rule_default_value, bool):
            self._raise_conf_exc(f"invalid rule_default_value, rule_name={rule_name}")
        conditions = rule.get(CONDITIONS_KEY, {})
        if not conditions or not isinstance(conditions, list):
            self._raise_conf_exc(f"invalid condition, rule_name={rule_name}")
        # validate conditions
        for condition in conditions:
            self._validate_condition(rule_name, condition)

    def _validate_feature(self, feature_name: str, feature_dict_def: Dict[str, Any]) -> None:
        if not feature_dict_def or not isinstance(feature_dict_def, dict):
            self._raise_conf_exc(f"invalid AWS AppConfig JSON schema detected, feature {feature_name} is invalid")
        feature_default_value = feature_dict_def.get(FEATURE_DEFAULT_VAL_KEY)
        if feature_default_value is None or not isinstance(feature_default_value, bool):
            self._raise_conf_exc(f"missing feature_default_value for feature, feature_name={feature_name}")
        # validate rules
        rules = feature_dict_def.get(RULES_KEY, [])
        if not rules:
            return
        if not isinstance(rules, list):
            self._raise_conf_exc(f"feature rules is not a list, feature_name={feature_name}")
        for rule in rules:
            self._validate_rule(feature_name, rule)

    def validate_json_schema(self, schema: Dict[str, Any]) -> None:
        if not isinstance(schema, dict):
            self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, root schema is not a dictionary")
        features_dict = schema.get(FEATURES_KEY)
        if not isinstance(features_dict, dict):
            return self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, missing features dictionary")
        for feature_name, feature_dict_def in features_dict.items():
            self._validate_feature(feature_name, feature_dict_def)

Classes

class ACTION (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class ACTION(str, Enum):
    EQUALS = "EQUALS"
    STARTSWITH = "STARTSWITH"
    ENDSWITH = "ENDSWITH"
    CONTAINS = "CONTAINS"

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var CONTAINS
var ENDSWITH
var EQUALS
var STARTSWITH
class SchemaValidator (logger: logging.Logger)
Expand source code
class SchemaValidator:
    def __init__(self, logger: Logger):
        self._logger = logger

    def _raise_conf_exc(self, error_str: str) -> None:
        self._logger.error(error_str)
        raise ConfigurationError(error_str)

    def _validate_condition(self, rule_name: str, condition: Dict[str, str]) -> None:
        if not condition or not isinstance(condition, dict):
            self._raise_conf_exc(f"invalid condition type, not a dictionary, rule_name={rule_name}")
        action = condition.get(CONDITION_ACTION, "")
        if action not in [ACTION.EQUALS.value, ACTION.STARTSWITH.value, ACTION.ENDSWITH.value, ACTION.CONTAINS.value]:
            self._raise_conf_exc(f"invalid action value, rule_name={rule_name}, action={action}")
        key = condition.get(CONDITION_KEY, "")
        if not key or not isinstance(key, str):
            self._raise_conf_exc(f"invalid key value, key has to be a non empty string, rule_name={rule_name}")
        value = condition.get(CONDITION_VALUE, "")
        if not value:
            self._raise_conf_exc(f"missing condition value, rule_name={rule_name}")

    def _validate_rule(self, feature_name: str, rule: Dict[str, Any]) -> None:
        if not rule or not isinstance(rule, dict):
            self._raise_conf_exc(f"feature rule is not a dictionary, feature_name={feature_name}")
        rule_name = rule.get(RULE_NAME_KEY)
        if not rule_name or rule_name is None or not isinstance(rule_name, str):
            return self._raise_conf_exc(f"invalid rule_name, feature_name={feature_name}")
        rule_default_value = rule.get(RULE_DEFAULT_VALUE)
        if rule_default_value is None or not isinstance(rule_default_value, bool):
            self._raise_conf_exc(f"invalid rule_default_value, rule_name={rule_name}")
        conditions = rule.get(CONDITIONS_KEY, {})
        if not conditions or not isinstance(conditions, list):
            self._raise_conf_exc(f"invalid condition, rule_name={rule_name}")
        # validate conditions
        for condition in conditions:
            self._validate_condition(rule_name, condition)

    def _validate_feature(self, feature_name: str, feature_dict_def: Dict[str, Any]) -> None:
        if not feature_dict_def or not isinstance(feature_dict_def, dict):
            self._raise_conf_exc(f"invalid AWS AppConfig JSON schema detected, feature {feature_name} is invalid")
        feature_default_value = feature_dict_def.get(FEATURE_DEFAULT_VAL_KEY)
        if feature_default_value is None or not isinstance(feature_default_value, bool):
            self._raise_conf_exc(f"missing feature_default_value for feature, feature_name={feature_name}")
        # validate rules
        rules = feature_dict_def.get(RULES_KEY, [])
        if not rules:
            return
        if not isinstance(rules, list):
            self._raise_conf_exc(f"feature rules is not a list, feature_name={feature_name}")
        for rule in rules:
            self._validate_rule(feature_name, rule)

    def validate_json_schema(self, schema: Dict[str, Any]) -> None:
        if not isinstance(schema, dict):
            self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, root schema is not a dictionary")
        features_dict = schema.get(FEATURES_KEY)
        if not isinstance(features_dict, dict):
            return self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, missing features dictionary")
        for feature_name, feature_dict_def in features_dict.items():
            self._validate_feature(feature_name, feature_dict_def)

Methods

def validate_json_schema(self, schema: Dict[str, Any]) ‑> NoneType
Expand source code
def validate_json_schema(self, schema: Dict[str, Any]) -> None:
    if not isinstance(schema, dict):
        self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, root schema is not a dictionary")
    features_dict = schema.get(FEATURES_KEY)
    if not isinstance(features_dict, dict):
        return self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, missing features dictionary")
    for feature_name, feature_dict_def in features_dict.items():
        self._validate_feature(feature_name, feature_dict_def)