Module aws_lambda_powertools.utilities.validation.base
Expand source code
import logging
from typing import Any, Dict, Optional
import fastjsonschema
import jmespath
from jmespath.exceptions import LexerError
from aws_lambda_powertools.shared.jmespath_functions import PowertoolsFunctions
from .exceptions import InvalidEnvelopeExpressionError, InvalidSchemaFormatError, SchemaValidationError
logger = logging.getLogger(__name__)
def validate_data_against_schema(data: Dict, schema: Dict, formats: Optional[Dict] = None):
"""Validate dict data against given JSON Schema
Parameters
----------
data : Dict
Data set to be validated
schema : Dict
JSON Schema to validate against
formats: Dict
Custom formats containing a key (e.g. int64) and a value expressed as regex or callback returning bool
Raises
------
SchemaValidationError
When schema validation fails against data set
InvalidSchemaFormatError
When JSON schema provided is invalid
"""
try:
fastjsonschema.validate(definition=schema, data=data, formats=formats)
except (TypeError, AttributeError, fastjsonschema.JsonSchemaDefinitionException) as e:
raise InvalidSchemaFormatError(f"Schema received: {schema}, Formats: {formats}. Error: {e}")
except fastjsonschema.JsonSchemaException as e:
message = f"Failed schema validation. Error: {e.message}, Path: {e.path}, Data: {e.value}" # noqa: B306, E501
raise SchemaValidationError(message)
def unwrap_event_from_envelope(data: Dict, envelope: str, jmespath_options: Optional[Dict]) -> Any:
"""Searches data using JMESPath expression
Parameters
----------
data : Dict
Data set to be filtered
envelope : str
JMESPath expression to filter data against
jmespath_options : Dict
Alternative JMESPath options to be included when filtering expr
Returns
-------
Any
Data found using JMESPath expression given in envelope
"""
if not jmespath_options:
jmespath_options = {"custom_functions": PowertoolsFunctions()}
try:
logger.debug(f"Envelope detected: {envelope}. JMESPath options: {jmespath_options}")
return jmespath.search(envelope, data, options=jmespath.Options(**jmespath_options))
except (LexerError, TypeError, UnicodeError) as e:
message = f"Failed to unwrap event from envelope using expression. Error: {e} Exp: {envelope}, Data: {data}" # noqa: B306, E501
raise InvalidEnvelopeExpressionError(message)
Functions
def unwrap_event_from_envelope(data: Dict, envelope: str, jmespath_options: Union[Dict, NoneType]) ‑> Any
-
Searches data using JMESPath expression
Parameters
data
:Dict
- Data set to be filtered
envelope
:str
- JMESPath expression to filter data against
jmespath_options
:Dict
- Alternative JMESPath options to be included when filtering expr
Returns
Any
- Data found using JMESPath expression given in envelope
Expand source code
def unwrap_event_from_envelope(data: Dict, envelope: str, jmespath_options: Optional[Dict]) -> Any: """Searches data using JMESPath expression Parameters ---------- data : Dict Data set to be filtered envelope : str JMESPath expression to filter data against jmespath_options : Dict Alternative JMESPath options to be included when filtering expr Returns ------- Any Data found using JMESPath expression given in envelope """ if not jmespath_options: jmespath_options = {"custom_functions": PowertoolsFunctions()} try: logger.debug(f"Envelope detected: {envelope}. JMESPath options: {jmespath_options}") return jmespath.search(envelope, data, options=jmespath.Options(**jmespath_options)) except (LexerError, TypeError, UnicodeError) as e: message = f"Failed to unwrap event from envelope using expression. Error: {e} Exp: {envelope}, Data: {data}" # noqa: B306, E501 raise InvalidEnvelopeExpressionError(message)
def validate_data_against_schema(data: Dict, schema: Dict, formats: Union[Dict, NoneType] = None)
-
Validate dict data against given JSON Schema
Parameters
data
:Dict
- Data set to be validated
schema
:Dict
- JSON Schema to validate against
formats
:Dict
- Custom formats containing a key (e.g. int64) and a value expressed as regex or callback returning bool
Raises
SchemaValidationError
- When schema validation fails against data set
InvalidSchemaFormatError
- When JSON schema provided is invalid
Expand source code
def validate_data_against_schema(data: Dict, schema: Dict, formats: Optional[Dict] = None): """Validate dict data against given JSON Schema Parameters ---------- data : Dict Data set to be validated schema : Dict JSON Schema to validate against formats: Dict Custom formats containing a key (e.g. int64) and a value expressed as regex or callback returning bool Raises ------ SchemaValidationError When schema validation fails against data set InvalidSchemaFormatError When JSON schema provided is invalid """ try: fastjsonschema.validate(definition=schema, data=data, formats=formats) except (TypeError, AttributeError, fastjsonschema.JsonSchemaDefinitionException) as e: raise InvalidSchemaFormatError(f"Schema received: {schema}, Formats: {formats}. Error: {e}") except fastjsonschema.JsonSchemaException as e: message = f"Failed schema validation. Error: {e.message}, Path: {e.path}, Data: {e.value}" # noqa: B306, E501 raise SchemaValidationError(message)