Module aws_lambda_powertools.utilities.parser.models.cloudwatch

Classes

class CloudWatchLogsData (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class CloudWatchLogsData(BaseModel):
    decoded_data: CloudWatchLogsDecode = Field(None, alias="data")

    @validator("decoded_data", pre=True, allow_reuse=True)
    def prepare_data(cls, value):
        try:
            logger.debug("Decoding base64 cloudwatch log data before parsing")
            payload = base64.b64decode(value)
            logger.debug("Decompressing cloudwatch log data before parsing")
            uncompressed = zlib.decompress(payload, zlib.MAX_WBITS | 32)
            return json.loads(uncompressed.decode("utf-8"))
        except Exception:
            raise ValueError("unable to decompress data")

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var decoded_dataCloudWatchLogsDecode

Static methods

def prepare_data(value)
class CloudWatchLogsDecode (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class CloudWatchLogsDecode(BaseModel):
    messageType: str
    owner: str
    logGroup: str
    logStream: str
    subscriptionFilters: List[str]
    logEvents: List[CloudWatchLogsLogEvent]
    policyLevel: Optional[str] = None

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var logEvents : List[CloudWatchLogsLogEvent]
var logGroup : str
var logStream : str
var messageType : str
var owner : str
var policyLevel : Optional[str]
var subscriptionFilters : List[str]
class CloudWatchLogsLogEvent (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class CloudWatchLogsLogEvent(BaseModel):
    id: str  # noqa AA03 VNE003
    timestamp: datetime
    message: Union[str, Type[BaseModel]]

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var id : str
var message : Union[str, Type[pydantic.main.BaseModel]]
var timestamp : datetime.datetime
class CloudWatchLogsModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class CloudWatchLogsModel(BaseModel):
    awslogs: CloudWatchLogsData

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var awslogsCloudWatchLogsData