Module aws_lambda_powertools.utilities.feature_toggles
Advanced feature toggles utility
Expand source code
"""Advanced feature toggles utility
"""
from .appconfig_fetcher import AppConfigFetcher
from .configuration_store import ConfigurationStore
from .exceptions import ConfigurationError
from .schema import ACTION, SchemaValidator
from .schema_fetcher import SchemaFetcher
__all__ = [
"ConfigurationError",
"ConfigurationStore",
"ACTION",
"SchemaValidator",
"AppConfigFetcher",
"SchemaFetcher",
]
Sub-modules
aws_lambda_powertools.utilities.feature_toggles.appconfig_fetcher
aws_lambda_powertools.utilities.feature_toggles.configuration_store
aws_lambda_powertools.utilities.feature_toggles.exceptions
aws_lambda_powertools.utilities.feature_toggles.schema
aws_lambda_powertools.utilities.feature_toggles.schema_fetcher
Classes
class ACTION (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code
class ACTION(str, Enum): EQUALS = "EQUALS" STARTSWITH = "STARTSWITH" ENDSWITH = "ENDSWITH" CONTAINS = "CONTAINS"
Ancestors
- builtins.str
- enum.Enum
Class variables
var CONTAINS
var ENDSWITH
var EQUALS
var STARTSWITH
class AppConfigFetcher (environment: str, service: str, configuration_name: str, cache_seconds: int, config: Union[botocore.config.Config, NoneType] = None)
-
Helper class that provides a standard way to create an ABC using inheritance.
This class fetches JSON schemas from AWS AppConfig
Parameters
environment
:str
- what appconfig environment to use 'dev/test' etc.
service
:str
- what service name to use from the supplied environment
configuration_name
:str
- what configuration to take from the environment & service combination
cache_seconds
:int
- cache expiration time, how often to call AppConfig to fetch latest configuration
config
:Optional[Config]
- boto3 client configuration
Expand source code
class AppConfigFetcher(SchemaFetcher): def __init__( self, environment: str, service: str, configuration_name: str, cache_seconds: int, config: Optional[Config] = None, ): """This class fetches JSON schemas from AWS AppConfig Parameters ---------- environment: str what appconfig environment to use 'dev/test' etc. service: str what service name to use from the supplied environment configuration_name: str what configuration to take from the environment & service combination cache_seconds: int cache expiration time, how often to call AppConfig to fetch latest configuration config: Optional[Config] boto3 client configuration """ super().__init__(configuration_name, cache_seconds) self._logger = logger self._conf_store = AppConfigProvider(environment=environment, application=service, config=config) def get_json_configuration(self) -> Dict[str, Any]: """Get configuration string from AWs AppConfig and return the parsed JSON dictionary Raises ------ ConfigurationError Any validation error or appconfig error that can occur Returns ------- Dict[str, Any] parsed JSON dictionary """ try: return self._conf_store.get( name=self.configuration_name, transform=TRANSFORM_TYPE, max_age=self._cache_seconds, ) # parse result conf as JSON, keep in cache for self.max_age seconds except (GetParameterError, TransformParameterError) as exc: error_str = f"unable to get AWS AppConfig configuration file, exception={str(exc)}" self._logger.error(error_str) raise ConfigurationError(error_str)
Ancestors
- SchemaFetcher
- abc.ABC
Methods
def get_json_configuration(self) ‑> Dict[str, Any]
-
Get configuration string from AWs AppConfig and return the parsed JSON dictionary
Raises
ConfigurationError
- Any validation error or appconfig error that can occur
Returns
Dict[str, Any]
- parsed JSON dictionary
Expand source code
def get_json_configuration(self) -> Dict[str, Any]: """Get configuration string from AWs AppConfig and return the parsed JSON dictionary Raises ------ ConfigurationError Any validation error or appconfig error that can occur Returns ------- Dict[str, Any] parsed JSON dictionary """ try: return self._conf_store.get( name=self.configuration_name, transform=TRANSFORM_TYPE, max_age=self._cache_seconds, ) # parse result conf as JSON, keep in cache for self.max_age seconds except (GetParameterError, TransformParameterError) as exc: error_str = f"unable to get AWS AppConfig configuration file, exception={str(exc)}" self._logger.error(error_str) raise ConfigurationError(error_str)
class ConfigurationError (*args, **kwargs)
-
When a a configuration store raises an exception on config retrieval or parsing
Expand source code
class ConfigurationError(Exception): """When a a configuration store raises an exception on config retrieval or parsing"""
Ancestors
- builtins.Exception
- builtins.BaseException
class ConfigurationStore (schema_fetcher: SchemaFetcher)
-
constructor
Parameters
schema_fetcher
:SchemaFetcher
- A schema JSON fetcher, can be AWS AppConfig, Hashicorp Consul etc.
Expand source code
class ConfigurationStore: def __init__(self, schema_fetcher: SchemaFetcher): """constructor Parameters ---------- schema_fetcher: SchemaFetcher A schema JSON fetcher, can be AWS AppConfig, Hashicorp Consul etc. """ self._logger = logger self._schema_fetcher = schema_fetcher self._schema_validator = schema.SchemaValidator(self._logger) def _match_by_action(self, action: str, condition_value: Any, context_value: Any) -> bool: if not context_value: return False mapping_by_action = { schema.ACTION.EQUALS.value: lambda a, b: a == b, schema.ACTION.STARTSWITH.value: lambda a, b: a.startswith(b), schema.ACTION.ENDSWITH.value: lambda a, b: a.endswith(b), schema.ACTION.CONTAINS.value: lambda a, b: a in b, } try: func = mapping_by_action.get(action, lambda a, b: False) return func(context_value, condition_value) except Exception as exc: self._logger.error(f"caught exception while matching action, action={action}, exception={str(exc)}") return False def _is_rule_matched(self, feature_name: str, rule: Dict[str, Any], rules_context: Dict[str, Any]) -> bool: rule_name = rule.get(schema.RULE_NAME_KEY, "") rule_default_value = rule.get(schema.RULE_DEFAULT_VALUE) conditions = cast(List[Dict], rule.get(schema.CONDITIONS_KEY)) for condition in conditions: context_value = rules_context.get(str(condition.get(schema.CONDITION_KEY))) if not self._match_by_action( condition.get(schema.CONDITION_ACTION, ""), condition.get(schema.CONDITION_VALUE), context_value, ): logger.debug( f"rule did not match action, rule_name={rule_name}, rule_default_value={rule_default_value}, " f"feature_name={feature_name}, context_value={str(context_value)} " ) # context doesn't match condition return False # if we got here, all conditions match logger.debug( f"rule matched, rule_name={rule_name}, rule_default_value={rule_default_value}, " f"feature_name={feature_name}" ) return True return False def _handle_rules( self, *, feature_name: str, rules_context: Dict[str, Any], feature_default_value: bool, rules: List[Dict[str, Any]], ) -> bool: for rule in rules: rule_default_value = rule.get(schema.RULE_DEFAULT_VALUE) if self._is_rule_matched(feature_name, rule, rules_context): return bool(rule_default_value) # no rule matched, return default value of feature logger.debug( f"no rule matched, returning default value of feature, feature_default_value={feature_default_value}, " f"feature_name={feature_name}" ) return feature_default_value return False def get_configuration(self) -> Dict[str, Any]: """Get configuration string from AWs AppConfig and returned the parsed JSON dictionary Raises ------ ConfigurationError Any validation error or appconfig error that can occur Returns ------ Dict[str, Any] parsed JSON dictionary """ # parse result conf as JSON, keep in cache for self.max_age seconds config = self._schema_fetcher.get_json_configuration() # validate schema self._schema_validator.validate_json_schema(config) return config def get_feature_toggle( self, *, feature_name: str, rules_context: Optional[Dict[str, Any]] = None, value_if_missing: bool ) -> bool: """Get a feature toggle boolean value. Value is calculated according to a set of rules and conditions. See below for explanation. Parameters ---------- feature_name: str feature name that you wish to fetch rules_context: Optional[Dict[str, Any]] dict of attributes that you would like to match the rules against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc. value_if_missing: bool this will be the returned value in case the feature toggle doesn't exist in the schema or there has been an error while fetching the configuration from appconfig Returns ------ bool calculated feature toggle value. several possibilities: 1. if the feature doesn't appear in the schema or there has been an error fetching the configuration -> error/warning log would appear and value_if_missing is returned 2. feature exists and has no rules or no rules have matched -> return feature_default_value of the defined feature 3. feature exists and a rule matches -> rule_default_value of rule is returned """ if rules_context is None: rules_context = {} try: toggles_dict: Dict[str, Any] = self.get_configuration() except ConfigurationError: logger.error("unable to get feature toggles JSON, returning provided value_if_missing value") return value_if_missing feature: Dict[str, Dict] = toggles_dict.get(schema.FEATURES_KEY, {}).get(feature_name, None) if feature is None: logger.warning( f"feature does not appear in configuration, using provided value_if_missing, " f"feature_name={feature_name}, value_if_missing={value_if_missing}" ) return value_if_missing rules_list = feature.get(schema.RULES_KEY) feature_default_value = feature.get(schema.FEATURE_DEFAULT_VAL_KEY) if not rules_list: # not rules but has a value logger.debug( f"no rules found, returning feature default value, feature_name={feature_name}, " f"default_value={feature_default_value}" ) return bool(feature_default_value) # look for first rule match logger.debug( f"looking for rule match, feature_name={feature_name}, feature_default_value={feature_default_value}" ) return self._handle_rules( feature_name=feature_name, rules_context=rules_context, feature_default_value=bool(feature_default_value), rules=cast(List, rules_list), ) def get_all_enabled_feature_toggles(self, *, rules_context: Optional[Dict[str, Any]] = None) -> List[str]: """Get all enabled feature toggles while also taking into account rule_context (when a feature has defined rules) Parameters ---------- rules_context: Optional[Dict[str, Any]] dict of attributes that you would like to match the rules against, can be `{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}` etc. Returns ---------- List[str] a list of all features name that are enabled by also taking into account rule_context (when a feature has defined rules) """ if rules_context is None: rules_context = {} try: toggles_dict: Dict[str, Any] = self.get_configuration() except ConfigurationError: logger.error("unable to get feature toggles JSON") return [] ret_list = [] features: Dict[str, Any] = toggles_dict.get(schema.FEATURES_KEY, {}) for feature_name, feature_dict_def in features.items(): rules_list = feature_dict_def.get(schema.RULES_KEY, []) feature_default_value = feature_dict_def.get(schema.FEATURE_DEFAULT_VAL_KEY) if feature_default_value and not rules_list: self._logger.debug( f"feature is enabled by default and has no defined rules, feature_name={feature_name}" ) ret_list.append(feature_name) elif self._handle_rules( feature_name=feature_name, rules_context=rules_context, feature_default_value=feature_default_value, rules=rules_list, ): self._logger.debug(f"feature's calculated value is True, feature_name={feature_name}") ret_list.append(feature_name) return ret_list
Methods
def get_all_enabled_feature_toggles(self, *, rules_context: Union[Dict[str, Any], NoneType] = None) ‑> List[str]
-
Get all enabled feature toggles while also taking into account rule_context (when a feature has defined rules)
Parameters
rules_context
:Optional[Dict[str, Any]]
- dict of attributes that you would like to match the rules
against, can be
{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}
etc.
Returns
List[str]
- a list of all features name that are enabled by also taking into account rule_context (when a feature has defined rules)
Expand source code
def get_all_enabled_feature_toggles(self, *, rules_context: Optional[Dict[str, Any]] = None) -> List[str]: """Get all enabled feature toggles while also taking into account rule_context (when a feature has defined rules) Parameters ---------- rules_context: Optional[Dict[str, Any]] dict of attributes that you would like to match the rules against, can be `{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}` etc. Returns ---------- List[str] a list of all features name that are enabled by also taking into account rule_context (when a feature has defined rules) """ if rules_context is None: rules_context = {} try: toggles_dict: Dict[str, Any] = self.get_configuration() except ConfigurationError: logger.error("unable to get feature toggles JSON") return [] ret_list = [] features: Dict[str, Any] = toggles_dict.get(schema.FEATURES_KEY, {}) for feature_name, feature_dict_def in features.items(): rules_list = feature_dict_def.get(schema.RULES_KEY, []) feature_default_value = feature_dict_def.get(schema.FEATURE_DEFAULT_VAL_KEY) if feature_default_value and not rules_list: self._logger.debug( f"feature is enabled by default and has no defined rules, feature_name={feature_name}" ) ret_list.append(feature_name) elif self._handle_rules( feature_name=feature_name, rules_context=rules_context, feature_default_value=feature_default_value, rules=rules_list, ): self._logger.debug(f"feature's calculated value is True, feature_name={feature_name}") ret_list.append(feature_name) return ret_list
def get_configuration(self) ‑> Dict[str, Any]
-
Get configuration string from AWs AppConfig and returned the parsed JSON dictionary
Raises
ConfigurationError
- Any validation error or appconfig error that can occur
Returns
Dict[str, Any]
- parsed JSON dictionary
Expand source code
def get_configuration(self) -> Dict[str, Any]: """Get configuration string from AWs AppConfig and returned the parsed JSON dictionary Raises ------ ConfigurationError Any validation error or appconfig error that can occur Returns ------ Dict[str, Any] parsed JSON dictionary """ # parse result conf as JSON, keep in cache for self.max_age seconds config = self._schema_fetcher.get_json_configuration() # validate schema self._schema_validator.validate_json_schema(config) return config
def get_feature_toggle(self, *, feature_name: str, rules_context: Union[Dict[str, Any], NoneType] = None, value_if_missing: bool) ‑> bool
-
Get a feature toggle boolean value. Value is calculated according to a set of rules and conditions.
See below for explanation.
Parameters
feature_name
:str
- feature name that you wish to fetch
rules_context
:Optional[Dict[str, Any]]
- dict of attributes that you would like to match the rules against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc.
value_if_missing
:bool
- this will be the returned value in case the feature toggle doesn't exist in the schema or there has been an error while fetching the configuration from appconfig
Returns
bool
- calculated feature toggle value. several possibilities: 1. if the feature doesn't appear in the schema or there has been an error fetching the configuration -> error/warning log would appear and value_if_missing is returned 2. feature exists and has no rules or no rules have matched -> return feature_default_value of the defined feature 3. feature exists and a rule matches -> rule_default_value of rule is returned
Expand source code
def get_feature_toggle( self, *, feature_name: str, rules_context: Optional[Dict[str, Any]] = None, value_if_missing: bool ) -> bool: """Get a feature toggle boolean value. Value is calculated according to a set of rules and conditions. See below for explanation. Parameters ---------- feature_name: str feature name that you wish to fetch rules_context: Optional[Dict[str, Any]] dict of attributes that you would like to match the rules against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc. value_if_missing: bool this will be the returned value in case the feature toggle doesn't exist in the schema or there has been an error while fetching the configuration from appconfig Returns ------ bool calculated feature toggle value. several possibilities: 1. if the feature doesn't appear in the schema or there has been an error fetching the configuration -> error/warning log would appear and value_if_missing is returned 2. feature exists and has no rules or no rules have matched -> return feature_default_value of the defined feature 3. feature exists and a rule matches -> rule_default_value of rule is returned """ if rules_context is None: rules_context = {} try: toggles_dict: Dict[str, Any] = self.get_configuration() except ConfigurationError: logger.error("unable to get feature toggles JSON, returning provided value_if_missing value") return value_if_missing feature: Dict[str, Dict] = toggles_dict.get(schema.FEATURES_KEY, {}).get(feature_name, None) if feature is None: logger.warning( f"feature does not appear in configuration, using provided value_if_missing, " f"feature_name={feature_name}, value_if_missing={value_if_missing}" ) return value_if_missing rules_list = feature.get(schema.RULES_KEY) feature_default_value = feature.get(schema.FEATURE_DEFAULT_VAL_KEY) if not rules_list: # not rules but has a value logger.debug( f"no rules found, returning feature default value, feature_name={feature_name}, " f"default_value={feature_default_value}" ) return bool(feature_default_value) # look for first rule match logger.debug( f"looking for rule match, feature_name={feature_name}, feature_default_value={feature_default_value}" ) return self._handle_rules( feature_name=feature_name, rules_context=rules_context, feature_default_value=bool(feature_default_value), rules=cast(List, rules_list), )
class SchemaFetcher (configuration_name: str, cache_seconds: int)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class SchemaFetcher(ABC): def __init__(self, configuration_name: str, cache_seconds: int): self.configuration_name = configuration_name self._cache_seconds = cache_seconds @abstractmethod def get_json_configuration(self) -> Dict[str, Any]: """Get configuration string from any configuration storing service and return the parsed JSON dictionary Raises ------ ConfigurationError Any error that can occur during schema fetch or JSON parse Returns ------- Dict[str, Any] parsed JSON dictionary """ return NotImplemented # pragma: no cover
Ancestors
- abc.ABC
Subclasses
Methods
def get_json_configuration(self) ‑> Dict[str, Any]
-
Get configuration string from any configuration storing service and return the parsed JSON dictionary
Raises
ConfigurationError
- Any error that can occur during schema fetch or JSON parse
Returns
Dict[str, Any]
- parsed JSON dictionary
Expand source code
@abstractmethod def get_json_configuration(self) -> Dict[str, Any]: """Get configuration string from any configuration storing service and return the parsed JSON dictionary Raises ------ ConfigurationError Any error that can occur during schema fetch or JSON parse Returns ------- Dict[str, Any] parsed JSON dictionary """ return NotImplemented # pragma: no cover
class SchemaValidator (logger: logging.Logger)
-
Expand source code
class SchemaValidator: def __init__(self, logger: Logger): self._logger = logger def _raise_conf_exc(self, error_str: str) -> None: self._logger.error(error_str) raise ConfigurationError(error_str) def _validate_condition(self, rule_name: str, condition: Dict[str, str]) -> None: if not condition or not isinstance(condition, dict): self._raise_conf_exc(f"invalid condition type, not a dictionary, rule_name={rule_name}") action = condition.get(CONDITION_ACTION, "") if action not in [ACTION.EQUALS.value, ACTION.STARTSWITH.value, ACTION.ENDSWITH.value, ACTION.CONTAINS.value]: self._raise_conf_exc(f"invalid action value, rule_name={rule_name}, action={action}") key = condition.get(CONDITION_KEY, "") if not key or not isinstance(key, str): self._raise_conf_exc(f"invalid key value, key has to be a non empty string, rule_name={rule_name}") value = condition.get(CONDITION_VALUE, "") if not value: self._raise_conf_exc(f"missing condition value, rule_name={rule_name}") def _validate_rule(self, feature_name: str, rule: Dict[str, Any]) -> None: if not rule or not isinstance(rule, dict): self._raise_conf_exc(f"feature rule is not a dictionary, feature_name={feature_name}") rule_name = rule.get(RULE_NAME_KEY) if not rule_name or rule_name is None or not isinstance(rule_name, str): return self._raise_conf_exc(f"invalid rule_name, feature_name={feature_name}") rule_default_value = rule.get(RULE_DEFAULT_VALUE) if rule_default_value is None or not isinstance(rule_default_value, bool): self._raise_conf_exc(f"invalid rule_default_value, rule_name={rule_name}") conditions = rule.get(CONDITIONS_KEY, {}) if not conditions or not isinstance(conditions, list): self._raise_conf_exc(f"invalid condition, rule_name={rule_name}") # validate conditions for condition in conditions: self._validate_condition(rule_name, condition) def _validate_feature(self, feature_name: str, feature_dict_def: Dict[str, Any]) -> None: if not feature_dict_def or not isinstance(feature_dict_def, dict): self._raise_conf_exc(f"invalid AWS AppConfig JSON schema detected, feature {feature_name} is invalid") feature_default_value = feature_dict_def.get(FEATURE_DEFAULT_VAL_KEY) if feature_default_value is None or not isinstance(feature_default_value, bool): self._raise_conf_exc(f"missing feature_default_value for feature, feature_name={feature_name}") # validate rules rules = feature_dict_def.get(RULES_KEY, []) if not rules: return if not isinstance(rules, list): self._raise_conf_exc(f"feature rules is not a list, feature_name={feature_name}") for rule in rules: self._validate_rule(feature_name, rule) def validate_json_schema(self, schema: Dict[str, Any]) -> None: if not isinstance(schema, dict): self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, root schema is not a dictionary") features_dict = schema.get(FEATURES_KEY) if not isinstance(features_dict, dict): return self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, missing features dictionary") for feature_name, feature_dict_def in features_dict.items(): self._validate_feature(feature_name, feature_dict_def)
Methods
def validate_json_schema(self, schema: Dict[str, Any]) ‑> NoneType
-
Expand source code
def validate_json_schema(self, schema: Dict[str, Any]) -> None: if not isinstance(schema, dict): self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, root schema is not a dictionary") features_dict = schema.get(FEATURES_KEY) if not isinstance(features_dict, dict): return self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, missing features dictionary") for feature_name, feature_dict_def in features_dict.items(): self._validate_feature(feature_name, feature_dict_def)