Module aws_lambda_powertools.utilities.data_masking.base
Classes
class DataMasking (provider: Optional[BaseProvider] = None, raise_on_missing_field: bool = True)
-
The DataMasking class orchestrates erasing, encrypting, and decrypting for the base provider.
Example:
from aws_lambda_powertools.utilities.data_masking.base import DataMasking def lambda_handler(event, context): masker = DataMasking() data = { "project": "powertools", "sensitive": "password" } erased = masker.erase(data,fields=["sensitive"]) return erased
Expand source code
class DataMasking: """ The DataMasking class orchestrates erasing, encrypting, and decrypting for the base provider. Example: ``` from aws_lambda_powertools.utilities.data_masking.base import DataMasking def lambda_handler(event, context): masker = DataMasking() data = { "project": "powertools", "sensitive": "password" } erased = masker.erase(data,fields=["sensitive"]) return erased ``` """ def __init__( self, provider: Optional[BaseProvider] = None, raise_on_missing_field: bool = True, ): self.provider = provider or BaseProvider() # NOTE: we depend on Provider to not confuse customers in passing the same 2 serializers in 2 places self.json_serializer = self.provider.json_serializer self.json_deserializer = self.provider.json_deserializer self.raise_on_missing_field = raise_on_missing_field def encrypt( self, data: dict | Mapping | Sequence | Number, provider_options: dict | None = None, **encryption_context: str, ) -> str: return self._apply_action( data=data, fields=None, action=self.provider.encrypt, provider_options=provider_options or {}, **encryption_context, ) def decrypt( self, data, provider_options: dict | None = None, **encryption_context: str, ) -> Any: return self._apply_action( data=data, fields=None, action=self.provider.decrypt, provider_options=provider_options or {}, **encryption_context, ) @overload def erase(self, data, fields: None) -> str: ... @overload def erase(self, data: list, fields: list[str]) -> list[str]: ... @overload def erase(self, data: tuple, fields: list[str]) -> tuple[str]: ... @overload def erase(self, data: dict, fields: list[str]) -> dict: ... def erase(self, data: Sequence | Mapping, fields: list[str] | None = None) -> str | list[str] | tuple[str] | dict: return self._apply_action(data=data, fields=fields, action=self.provider.erase) def _apply_action( self, data, fields: list[str] | None, action: Callable, provider_options: dict | None = None, **encryption_context: str, ): """ Helper method to determine whether to apply a given action to the entire input data or to specific fields if the 'fields' argument is specified. Parameters ---------- data : str | dict The input data to process. fields : Optional[List[str]] A list of fields to apply the action to. If 'None', the action is applied to the entire 'data'. action : Callable The action to apply to the data. It should be a callable that performs an operation on the data and returns the modified value. provider_options : dict Provider specific keyword arguments to propagate; used as an escape hatch. encryption_context: str Encryption context to use in encrypt and decrypt operations. Returns ------- any The modified data after applying the action. """ if fields is not None: logger.debug(f"Running action {action.__name__} with fields {fields}") return self._apply_action_to_fields( data=data, fields=fields, action=action, provider_options=provider_options, **encryption_context, ) else: logger.debug(f"Running action {action.__name__} with the entire data") return action(data=data, provider_options=provider_options, **encryption_context) def _apply_action_to_fields( self, data: Union[dict, str], fields: list, action: Callable, provider_options: dict | None = None, **encryption_context: str, ) -> Union[dict, str]: """ This method takes the input data, which can be either a dictionary or a JSON string, and erases, encrypts, or decrypts the specified fields. Parameters ---------- data : Union[dict, str]) The input data to process. It can be either a dictionary or a JSON string. fields : List A list of fields to apply the action to. Each field can be specified as a string or a list of strings representing nested keys in the dictionary. action : Callable The action to apply to the fields. It should be a callable that takes the current value of the field as the first argument and any additional arguments that might be required for the action. It performs an operation on the current value using the provided arguments and returns the modified value. provider_options : dict Optional dictionary representing additional options for the action. **encryption_context: str Additional keyword arguments collected into a dictionary. Returns ------- dict | str The modified dictionary or string after applying the action to the specified fields. Raises ------- ValueError If 'fields' parameter is None. TypeError If the 'data' parameter is not a traversable type Example ------- ```python >>> data = {'a': {'b': {'c': 1}}, 'x': {'y': 2}} >>> fields = ['a.b.c', 'a.x.y'] # The function will transform the value at 'a.b.c' (1) and 'a.x.y' (2) # and store the result as: new_dict = {'a': {'b': {'c': '*****'}}, 'x': {'y': '*****'}} ``` """ data_parsed: dict = self._normalize_data_to_parse(fields, data) # For in-place updates, json_parse accepts a callback function # this function must receive 3 args: field_value, fields, field_name # We create a partial callback to pre-populate known options (action, provider opts, enc ctx) update_callback = functools.partial( self._call_action, action=action, provider_options=provider_options, **encryption_context, ) # Iterate over each field to be parsed. for field_parse in fields: # Parse the field expression using a 'parse' function. json_parse = parse(field_parse) # Find the corresponding keys in the normalized data using the parsed expression. result_parse = json_parse.find(data_parsed) if not result_parse: if self.raise_on_missing_field: # If the data for the field is not found, raise an exception. raise DataMaskingFieldNotFoundError(f"Field or expression {field_parse} not found in {data_parsed}") else: # If the data for the field is not found, warning. warnings.warn(f"Field or expression {field_parse} not found in {data_parsed}", stacklevel=2) # For in-place updates, json_parse accepts a callback function # that receives 3 args: field_value, fields, field_name # We create a partial callback to pre-populate known provider options (action, provider opts, enc ctx) update_callback = functools.partial( self._call_action, action=action, provider_options=provider_options, **encryption_context, ) json_parse.update( data_parsed, lambda field_value, fields, field_name: update_callback(field_value, fields, field_name), # noqa: B023 ) return data_parsed @staticmethod def _call_action( field_value: Any, fields: dict[str, Any], field_name: str, action: Callable, provider_options: dict | None = None, **encryption_context, ) -> None: """ Apply a specified action to a field value and update the fields dictionary. Params: -------- - field_value: Current value of the field being processed. - fields: Dictionary representing the fields being processed (mutable). - field_name: Name of the field being processed. - action: Callable (function or method) to be applied to the field_value. - provider_options: Optional dictionary representing additional options for the action. - **encryption_context: Additional keyword arguments collected into a dictionary. Returns: - fields[field_name]: Returns the processed field value """ fields[field_name] = action(field_value, provider_options=provider_options, **encryption_context) return fields[field_name] def _normalize_data_to_parse(self, fields: list, data: str | dict) -> dict: if not fields: raise ValueError("No fields specified.") if isinstance(data, str): # Parse JSON string as dictionary data_parsed = self.json_deserializer(data) elif isinstance(data, dict): # Convert the data to a JSON string in case it contains non-string keys (e.g., ints) # Parse the JSON string back into a dictionary data_parsed = self.json_deserializer(self.json_serializer(data)) else: raise DataMaskingUnsupportedTypeError( f"Unsupported data type. Expected a traversable type (dict or str), but got {type(data)}.", ) return data_parsed
Methods
def decrypt(self, data, provider_options: dict | None = None, **encryption_context: str) ‑> Any
def encrypt(self, data: dict | Mapping | Sequence | Number, provider_options: dict | None = None, **encryption_context: str) ‑> str
def erase(self, data: Sequence | Mapping, fields: list[str] | None = None) ‑> str | list[str] | tuple[str] | dict