Module aws_lambda_powertools.utilities.feature_toggles

Advanced feature toggles utility

Expand source code
"""Advanced feature toggles utility
"""
from .appconfig_fetcher import AppConfigFetcher
from .configuration_store import ConfigurationStore
from .exceptions import ConfigurationError
from .schema import ACTION, SchemaValidator
from .schema_fetcher import SchemaFetcher

__all__ = [
    "ConfigurationError",
    "ConfigurationStore",
    "ACTION",
    "SchemaValidator",
    "AppConfigFetcher",
    "SchemaFetcher",
]

Sub-modules

aws_lambda_powertools.utilities.feature_toggles.appconfig_fetcher
aws_lambda_powertools.utilities.feature_toggles.configuration_store
aws_lambda_powertools.utilities.feature_toggles.exceptions
aws_lambda_powertools.utilities.feature_toggles.schema
aws_lambda_powertools.utilities.feature_toggles.schema_fetcher

Classes

class ACTION (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class ACTION(str, Enum):
    EQUALS = "EQUALS"
    STARTSWITH = "STARTSWITH"
    ENDSWITH = "ENDSWITH"
    CONTAINS = "CONTAINS"

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var CONTAINS
var ENDSWITH
var EQUALS
var STARTSWITH
class AppConfigFetcher (environment: str, service: str, configuration_name: str, cache_seconds: int, config: Union[botocore.config.Config, NoneType] = None)

Helper class that provides a standard way to create an ABC using inheritance.

This class fetches JSON schemas from AWS AppConfig

Parameters

environment : str
what appconfig environment to use 'dev/test' etc.
service : str
what service name to use from the supplied environment
configuration_name : str
what configuration to take from the environment & service combination
cache_seconds : int
cache expiration time, how often to call AppConfig to fetch latest configuration
config : Optional[Config]
boto3 client configuration
Expand source code
class AppConfigFetcher(SchemaFetcher):
    def __init__(
        self,
        environment: str,
        service: str,
        configuration_name: str,
        cache_seconds: int,
        config: Optional[Config] = None,
    ):
        """This class fetches JSON schemas from AWS AppConfig

        Parameters
        ----------
        environment: str
            what appconfig environment to use 'dev/test' etc.
        service: str
            what service name to use from the supplied environment
        configuration_name: str
            what configuration to take from the environment & service combination
        cache_seconds: int
            cache expiration time, how often to call AppConfig to fetch latest configuration
        config: Optional[Config]
            boto3 client configuration
        """
        super().__init__(configuration_name, cache_seconds)
        self._logger = logger
        self._conf_store = AppConfigProvider(environment=environment, application=service, config=config)

    def get_json_configuration(self) -> Dict[str, Any]:
        """Get configuration string from AWs AppConfig and return the parsed JSON dictionary

        Raises
        ------
        ConfigurationError
            Any validation error or appconfig error that can occur

        Returns
        -------
        Dict[str, Any]
            parsed JSON dictionary
        """
        try:
            return self._conf_store.get(
                name=self.configuration_name,
                transform=TRANSFORM_TYPE,
                max_age=self._cache_seconds,
            )  # parse result conf as JSON, keep in cache for self.max_age seconds
        except (GetParameterError, TransformParameterError) as exc:
            error_str = f"unable to get AWS AppConfig configuration file, exception={str(exc)}"
            self._logger.error(error_str)
            raise ConfigurationError(error_str)

Ancestors

Methods

def get_json_configuration(self) ‑> Dict[str, Any]

Get configuration string from AWs AppConfig and return the parsed JSON dictionary

Raises

ConfigurationError
Any validation error or appconfig error that can occur

Returns

Dict[str, Any]
parsed JSON dictionary
Expand source code
def get_json_configuration(self) -> Dict[str, Any]:
    """Get configuration string from AWs AppConfig and return the parsed JSON dictionary

    Raises
    ------
    ConfigurationError
        Any validation error or appconfig error that can occur

    Returns
    -------
    Dict[str, Any]
        parsed JSON dictionary
    """
    try:
        return self._conf_store.get(
            name=self.configuration_name,
            transform=TRANSFORM_TYPE,
            max_age=self._cache_seconds,
        )  # parse result conf as JSON, keep in cache for self.max_age seconds
    except (GetParameterError, TransformParameterError) as exc:
        error_str = f"unable to get AWS AppConfig configuration file, exception={str(exc)}"
        self._logger.error(error_str)
        raise ConfigurationError(error_str)
class ConfigurationError (*args, **kwargs)

When a a configuration store raises an exception on config retrieval or parsing

Expand source code
class ConfigurationError(Exception):
    """When a a configuration store raises an exception on config retrieval or parsing"""

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ConfigurationStore (schema_fetcher: SchemaFetcher)

constructor

Parameters

schema_fetcher : SchemaFetcher
A schema JSON fetcher, can be AWS AppConfig, Hashicorp Consul etc.
Expand source code
class ConfigurationStore:
    def __init__(self, schema_fetcher: SchemaFetcher):
        """constructor

        Parameters
        ----------
        schema_fetcher: SchemaFetcher
            A schema JSON fetcher, can be AWS AppConfig, Hashicorp Consul etc.
        """
        self._logger = logger
        self._schema_fetcher = schema_fetcher
        self._schema_validator = schema.SchemaValidator(self._logger)

    def _match_by_action(self, action: str, condition_value: Any, context_value: Any) -> bool:
        if not context_value:
            return False
        mapping_by_action = {
            schema.ACTION.EQUALS.value: lambda a, b: a == b,
            schema.ACTION.STARTSWITH.value: lambda a, b: a.startswith(b),
            schema.ACTION.ENDSWITH.value: lambda a, b: a.endswith(b),
            schema.ACTION.CONTAINS.value: lambda a, b: a in b,
        }

        try:
            func = mapping_by_action.get(action, lambda a, b: False)
            return func(context_value, condition_value)
        except Exception as exc:
            self._logger.error(f"caught exception while matching action, action={action}, exception={str(exc)}")
            return False

    def _is_rule_matched(self, feature_name: str, rule: Dict[str, Any], rules_context: Dict[str, Any]) -> bool:
        rule_name = rule.get(schema.RULE_NAME_KEY, "")
        rule_default_value = rule.get(schema.RULE_DEFAULT_VALUE)
        conditions = cast(List[Dict], rule.get(schema.CONDITIONS_KEY))

        for condition in conditions:
            context_value = rules_context.get(str(condition.get(schema.CONDITION_KEY)))
            if not self._match_by_action(
                condition.get(schema.CONDITION_ACTION, ""),
                condition.get(schema.CONDITION_VALUE),
                context_value,
            ):
                logger.debug(
                    f"rule did not match action, rule_name={rule_name}, rule_default_value={rule_default_value}, "
                    f"feature_name={feature_name}, context_value={str(context_value)} "
                )
                # context doesn't match condition
                return False
            # if we got here, all conditions match
            logger.debug(
                f"rule matched, rule_name={rule_name}, rule_default_value={rule_default_value}, "
                f"feature_name={feature_name}"
            )
            return True
        return False

    def _handle_rules(
        self,
        *,
        feature_name: str,
        rules_context: Dict[str, Any],
        feature_default_value: bool,
        rules: List[Dict[str, Any]],
    ) -> bool:
        for rule in rules:
            rule_default_value = rule.get(schema.RULE_DEFAULT_VALUE)
            if self._is_rule_matched(feature_name, rule, rules_context):
                return bool(rule_default_value)
            # no rule matched, return default value of feature
            logger.debug(
                f"no rule matched, returning default value of feature, feature_default_value={feature_default_value}, "
                f"feature_name={feature_name}"
            )
            return feature_default_value
        return False

    def get_configuration(self) -> Dict[str, Any]:
        """Get configuration string from AWs AppConfig and returned the parsed JSON dictionary

        Raises
        ------
        ConfigurationError
            Any validation error or appconfig error that can occur

        Returns
        ------
        Dict[str, Any]
            parsed JSON dictionary
        """
        # parse result conf as JSON, keep in cache for self.max_age seconds
        config = self._schema_fetcher.get_json_configuration()
        # validate schema
        self._schema_validator.validate_json_schema(config)
        return config

    def get_feature_toggle(
        self, *, feature_name: str, rules_context: Optional[Dict[str, Any]] = None, value_if_missing: bool
    ) -> bool:
        """Get a feature toggle boolean value. Value is calculated according to a set of rules and conditions.

        See below for explanation.

        Parameters
        ----------
        feature_name: str
            feature name that you wish to fetch
        rules_context: Optional[Dict[str, Any]]
            dict of attributes that you would like to match the rules
            against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc.
        value_if_missing: bool
            this will be the returned value in case the feature toggle doesn't exist in
            the schema or there has been an error while fetching the
            configuration from appconfig

        Returns
        ------
        bool
            calculated feature toggle value. several possibilities:
            1. if the feature doesn't appear in the schema or there has been an error fetching the
               configuration -> error/warning log would appear and value_if_missing is returned
            2. feature exists and has no rules or no rules have matched -> return feature_default_value of
               the defined feature
            3. feature exists and a rule matches -> rule_default_value of rule is returned
        """
        if rules_context is None:
            rules_context = {}

        try:
            toggles_dict: Dict[str, Any] = self.get_configuration()
        except ConfigurationError:
            logger.error("unable to get feature toggles JSON, returning provided value_if_missing value")
            return value_if_missing

        feature: Dict[str, Dict] = toggles_dict.get(schema.FEATURES_KEY, {}).get(feature_name, None)
        if feature is None:
            logger.warning(
                f"feature does not appear in configuration, using provided value_if_missing, "
                f"feature_name={feature_name}, value_if_missing={value_if_missing}"
            )
            return value_if_missing

        rules_list = feature.get(schema.RULES_KEY)
        feature_default_value = feature.get(schema.FEATURE_DEFAULT_VAL_KEY)
        if not rules_list:
            # not rules but has a value
            logger.debug(
                f"no rules found, returning feature default value, feature_name={feature_name}, "
                f"default_value={feature_default_value}"
            )
            return bool(feature_default_value)
        # look for first rule match
        logger.debug(
            f"looking for rule match,  feature_name={feature_name}, feature_default_value={feature_default_value}"
        )
        return self._handle_rules(
            feature_name=feature_name,
            rules_context=rules_context,
            feature_default_value=bool(feature_default_value),
            rules=cast(List, rules_list),
        )

    def get_all_enabled_feature_toggles(self, *, rules_context: Optional[Dict[str, Any]] = None) -> List[str]:
        """Get all enabled feature toggles while also taking into account rule_context
        (when a feature has defined rules)

        Parameters
        ----------
        rules_context: Optional[Dict[str, Any]]
            dict of attributes that you would like to match the rules
            against, can be `{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}` etc.

        Returns
        ----------
        List[str]
            a list of all features name that are enabled by also taking into account
            rule_context (when a feature has defined rules)
        """
        if rules_context is None:
            rules_context = {}

        try:
            toggles_dict: Dict[str, Any] = self.get_configuration()
        except ConfigurationError:
            logger.error("unable to get feature toggles JSON")
            return []

        ret_list = []
        features: Dict[str, Any] = toggles_dict.get(schema.FEATURES_KEY, {})
        for feature_name, feature_dict_def in features.items():
            rules_list = feature_dict_def.get(schema.RULES_KEY, [])
            feature_default_value = feature_dict_def.get(schema.FEATURE_DEFAULT_VAL_KEY)
            if feature_default_value and not rules_list:
                self._logger.debug(
                    f"feature is enabled by default and has no defined rules, feature_name={feature_name}"
                )
                ret_list.append(feature_name)
            elif self._handle_rules(
                feature_name=feature_name,
                rules_context=rules_context,
                feature_default_value=feature_default_value,
                rules=rules_list,
            ):
                self._logger.debug(f"feature's calculated value is True, feature_name={feature_name}")
                ret_list.append(feature_name)

        return ret_list

Methods

def get_all_enabled_feature_toggles(self, *, rules_context: Union[Dict[str, Any], NoneType] = None) ‑> List[str]

Get all enabled feature toggles while also taking into account rule_context (when a feature has defined rules)

Parameters

rules_context : Optional[Dict[str, Any]]
dict of attributes that you would like to match the rules against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc.

Returns

List[str]
a list of all features name that are enabled by also taking into account rule_context (when a feature has defined rules)
Expand source code
def get_all_enabled_feature_toggles(self, *, rules_context: Optional[Dict[str, Any]] = None) -> List[str]:
    """Get all enabled feature toggles while also taking into account rule_context
    (when a feature has defined rules)

    Parameters
    ----------
    rules_context: Optional[Dict[str, Any]]
        dict of attributes that you would like to match the rules
        against, can be `{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}` etc.

    Returns
    ----------
    List[str]
        a list of all features name that are enabled by also taking into account
        rule_context (when a feature has defined rules)
    """
    if rules_context is None:
        rules_context = {}

    try:
        toggles_dict: Dict[str, Any] = self.get_configuration()
    except ConfigurationError:
        logger.error("unable to get feature toggles JSON")
        return []

    ret_list = []
    features: Dict[str, Any] = toggles_dict.get(schema.FEATURES_KEY, {})
    for feature_name, feature_dict_def in features.items():
        rules_list = feature_dict_def.get(schema.RULES_KEY, [])
        feature_default_value = feature_dict_def.get(schema.FEATURE_DEFAULT_VAL_KEY)
        if feature_default_value and not rules_list:
            self._logger.debug(
                f"feature is enabled by default and has no defined rules, feature_name={feature_name}"
            )
            ret_list.append(feature_name)
        elif self._handle_rules(
            feature_name=feature_name,
            rules_context=rules_context,
            feature_default_value=feature_default_value,
            rules=rules_list,
        ):
            self._logger.debug(f"feature's calculated value is True, feature_name={feature_name}")
            ret_list.append(feature_name)

    return ret_list
def get_configuration(self) ‑> Dict[str, Any]

Get configuration string from AWs AppConfig and returned the parsed JSON dictionary

Raises

ConfigurationError
Any validation error or appconfig error that can occur

Returns

Dict[str, Any]
parsed JSON dictionary
Expand source code
def get_configuration(self) -> Dict[str, Any]:
    """Get configuration string from AWs AppConfig and returned the parsed JSON dictionary

    Raises
    ------
    ConfigurationError
        Any validation error or appconfig error that can occur

    Returns
    ------
    Dict[str, Any]
        parsed JSON dictionary
    """
    # parse result conf as JSON, keep in cache for self.max_age seconds
    config = self._schema_fetcher.get_json_configuration()
    # validate schema
    self._schema_validator.validate_json_schema(config)
    return config
def get_feature_toggle(self, *, feature_name: str, rules_context: Union[Dict[str, Any], NoneType] = None, value_if_missing: bool) ‑> bool

Get a feature toggle boolean value. Value is calculated according to a set of rules and conditions.

See below for explanation.

Parameters

feature_name : str
feature name that you wish to fetch
rules_context : Optional[Dict[str, Any]]
dict of attributes that you would like to match the rules against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc.
value_if_missing : bool
this will be the returned value in case the feature toggle doesn't exist in the schema or there has been an error while fetching the configuration from appconfig

Returns

bool
calculated feature toggle value. several possibilities: 1. if the feature doesn't appear in the schema or there has been an error fetching the configuration -> error/warning log would appear and value_if_missing is returned 2. feature exists and has no rules or no rules have matched -> return feature_default_value of the defined feature 3. feature exists and a rule matches -> rule_default_value of rule is returned
Expand source code
def get_feature_toggle(
    self, *, feature_name: str, rules_context: Optional[Dict[str, Any]] = None, value_if_missing: bool
) -> bool:
    """Get a feature toggle boolean value. Value is calculated according to a set of rules and conditions.

    See below for explanation.

    Parameters
    ----------
    feature_name: str
        feature name that you wish to fetch
    rules_context: Optional[Dict[str, Any]]
        dict of attributes that you would like to match the rules
        against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc.
    value_if_missing: bool
        this will be the returned value in case the feature toggle doesn't exist in
        the schema or there has been an error while fetching the
        configuration from appconfig

    Returns
    ------
    bool
        calculated feature toggle value. several possibilities:
        1. if the feature doesn't appear in the schema or there has been an error fetching the
           configuration -> error/warning log would appear and value_if_missing is returned
        2. feature exists and has no rules or no rules have matched -> return feature_default_value of
           the defined feature
        3. feature exists and a rule matches -> rule_default_value of rule is returned
    """
    if rules_context is None:
        rules_context = {}

    try:
        toggles_dict: Dict[str, Any] = self.get_configuration()
    except ConfigurationError:
        logger.error("unable to get feature toggles JSON, returning provided value_if_missing value")
        return value_if_missing

    feature: Dict[str, Dict] = toggles_dict.get(schema.FEATURES_KEY, {}).get(feature_name, None)
    if feature is None:
        logger.warning(
            f"feature does not appear in configuration, using provided value_if_missing, "
            f"feature_name={feature_name}, value_if_missing={value_if_missing}"
        )
        return value_if_missing

    rules_list = feature.get(schema.RULES_KEY)
    feature_default_value = feature.get(schema.FEATURE_DEFAULT_VAL_KEY)
    if not rules_list:
        # not rules but has a value
        logger.debug(
            f"no rules found, returning feature default value, feature_name={feature_name}, "
            f"default_value={feature_default_value}"
        )
        return bool(feature_default_value)
    # look for first rule match
    logger.debug(
        f"looking for rule match,  feature_name={feature_name}, feature_default_value={feature_default_value}"
    )
    return self._handle_rules(
        feature_name=feature_name,
        rules_context=rules_context,
        feature_default_value=bool(feature_default_value),
        rules=cast(List, rules_list),
    )
class SchemaFetcher (configuration_name: str, cache_seconds: int)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class SchemaFetcher(ABC):
    def __init__(self, configuration_name: str, cache_seconds: int):
        self.configuration_name = configuration_name
        self._cache_seconds = cache_seconds

    @abstractmethod
    def get_json_configuration(self) -> Dict[str, Any]:
        """Get configuration string from any configuration storing service and return the parsed JSON dictionary

        Raises
        ------
        ConfigurationError
            Any error that can occur during schema fetch or JSON parse

        Returns
        -------
        Dict[str, Any]
            parsed JSON dictionary
        """
        return NotImplemented  # pragma: no cover

Ancestors

  • abc.ABC

Subclasses

Methods

def get_json_configuration(self) ‑> Dict[str, Any]

Get configuration string from any configuration storing service and return the parsed JSON dictionary

Raises

ConfigurationError
Any error that can occur during schema fetch or JSON parse

Returns

Dict[str, Any]
parsed JSON dictionary
Expand source code
@abstractmethod
def get_json_configuration(self) -> Dict[str, Any]:
    """Get configuration string from any configuration storing service and return the parsed JSON dictionary

    Raises
    ------
    ConfigurationError
        Any error that can occur during schema fetch or JSON parse

    Returns
    -------
    Dict[str, Any]
        parsed JSON dictionary
    """
    return NotImplemented  # pragma: no cover
class SchemaValidator (logger: logging.Logger)
Expand source code
class SchemaValidator:
    def __init__(self, logger: Logger):
        self._logger = logger

    def _raise_conf_exc(self, error_str: str) -> None:
        self._logger.error(error_str)
        raise ConfigurationError(error_str)

    def _validate_condition(self, rule_name: str, condition: Dict[str, str]) -> None:
        if not condition or not isinstance(condition, dict):
            self._raise_conf_exc(f"invalid condition type, not a dictionary, rule_name={rule_name}")
        action = condition.get(CONDITION_ACTION, "")
        if action not in [ACTION.EQUALS.value, ACTION.STARTSWITH.value, ACTION.ENDSWITH.value, ACTION.CONTAINS.value]:
            self._raise_conf_exc(f"invalid action value, rule_name={rule_name}, action={action}")
        key = condition.get(CONDITION_KEY, "")
        if not key or not isinstance(key, str):
            self._raise_conf_exc(f"invalid key value, key has to be a non empty string, rule_name={rule_name}")
        value = condition.get(CONDITION_VALUE, "")
        if not value:
            self._raise_conf_exc(f"missing condition value, rule_name={rule_name}")

    def _validate_rule(self, feature_name: str, rule: Dict[str, Any]) -> None:
        if not rule or not isinstance(rule, dict):
            self._raise_conf_exc(f"feature rule is not a dictionary, feature_name={feature_name}")
        rule_name = rule.get(RULE_NAME_KEY)
        if not rule_name or rule_name is None or not isinstance(rule_name, str):
            return self._raise_conf_exc(f"invalid rule_name, feature_name={feature_name}")
        rule_default_value = rule.get(RULE_DEFAULT_VALUE)
        if rule_default_value is None or not isinstance(rule_default_value, bool):
            self._raise_conf_exc(f"invalid rule_default_value, rule_name={rule_name}")
        conditions = rule.get(CONDITIONS_KEY, {})
        if not conditions or not isinstance(conditions, list):
            self._raise_conf_exc(f"invalid condition, rule_name={rule_name}")
        # validate conditions
        for condition in conditions:
            self._validate_condition(rule_name, condition)

    def _validate_feature(self, feature_name: str, feature_dict_def: Dict[str, Any]) -> None:
        if not feature_dict_def or not isinstance(feature_dict_def, dict):
            self._raise_conf_exc(f"invalid AWS AppConfig JSON schema detected, feature {feature_name} is invalid")
        feature_default_value = feature_dict_def.get(FEATURE_DEFAULT_VAL_KEY)
        if feature_default_value is None or not isinstance(feature_default_value, bool):
            self._raise_conf_exc(f"missing feature_default_value for feature, feature_name={feature_name}")
        # validate rules
        rules = feature_dict_def.get(RULES_KEY, [])
        if not rules:
            return
        if not isinstance(rules, list):
            self._raise_conf_exc(f"feature rules is not a list, feature_name={feature_name}")
        for rule in rules:
            self._validate_rule(feature_name, rule)

    def validate_json_schema(self, schema: Dict[str, Any]) -> None:
        if not isinstance(schema, dict):
            self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, root schema is not a dictionary")
        features_dict = schema.get(FEATURES_KEY)
        if not isinstance(features_dict, dict):
            return self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, missing features dictionary")
        for feature_name, feature_dict_def in features_dict.items():
            self._validate_feature(feature_name, feature_dict_def)

Methods

def validate_json_schema(self, schema: Dict[str, Any]) ‑> NoneType
Expand source code
def validate_json_schema(self, schema: Dict[str, Any]) -> None:
    if not isinstance(schema, dict):
        self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, root schema is not a dictionary")
    features_dict = schema.get(FEATURES_KEY)
    if not isinstance(features_dict, dict):
        return self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, missing features dictionary")
    for feature_name, feature_dict_def in features_dict.items():
        self._validate_feature(feature_name, feature_dict_def)