Module aws_lambda_powertools.utilities.parser.models.kinesis_firehose_sqs
Expand source code
import json
from typing import List, Optional
from pydantic import BaseModel, PositiveInt, validator
from aws_lambda_powertools.shared.functions import base64_decode
from aws_lambda_powertools.utilities.parser.models import KinesisFirehoseRecordMetadata
from .sqs import SqsRecordModel
class KinesisFirehoseSqsRecord(BaseModel):
data: SqsRecordModel
recordId: str
approximateArrivalTimestamp: PositiveInt
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None
@validator("data", pre=True, allow_reuse=True)
def data_base64_decode(cls, value):
# Firehose payload is encoded
return json.loads(base64_decode(value))
class KinesisFirehoseSqsModel(BaseModel):
invocationId: str
deliveryStreamArn: str
region: str
sourceKinesisStreamArn: Optional[str] = None
records: List[KinesisFirehoseSqsRecord]
Classes
class KinesisFirehoseSqsModel (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisFirehoseSqsModel(BaseModel): invocationId: str deliveryStreamArn: str region: str sourceKinesisStreamArn: Optional[str] = None records: List[KinesisFirehoseSqsRecord]
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var deliveryStreamArn : str
var invocationId : str
var records : List[KinesisFirehoseSqsRecord]
var region : str
var sourceKinesisStreamArn : Optional[str]
class KinesisFirehoseSqsRecord (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisFirehoseSqsRecord(BaseModel): data: SqsRecordModel recordId: str approximateArrivalTimestamp: PositiveInt kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None @validator("data", pre=True, allow_reuse=True) def data_base64_decode(cls, value): # Firehose payload is encoded return json.loads(base64_decode(value))
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var approximateArrivalTimestamp : pydantic.types.PositiveInt
var data : SqsRecordModel
var kinesisRecordMetadata : Optional[KinesisFirehoseRecordMetadata]
var recordId : str
Static methods
def data_base64_decode(value)
-
Expand source code
@validator("data", pre=True, allow_reuse=True) def data_base64_decode(cls, value): # Firehose payload is encoded return json.loads(base64_decode(value))