Module aws_lambda_powertools.utilities.parser.models.kinesis_firehose_sqs

Expand source code
import json
from typing import List, Optional

from pydantic import BaseModel, PositiveInt, validator

from aws_lambda_powertools.shared.functions import base64_decode
from aws_lambda_powertools.utilities.parser.models import KinesisFirehoseRecordMetadata

from .sqs import SqsRecordModel


class KinesisFirehoseSqsRecord(BaseModel):
    data: SqsRecordModel
    recordId: str
    approximateArrivalTimestamp: PositiveInt
    kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None

    @validator("data", pre=True, allow_reuse=True)
    def data_base64_decode(cls, value):
        # Firehose payload is encoded
        return json.loads(base64_decode(value))


class KinesisFirehoseSqsModel(BaseModel):
    invocationId: str
    deliveryStreamArn: str
    region: str
    sourceKinesisStreamArn: Optional[str] = None
    records: List[KinesisFirehoseSqsRecord]

Classes

class KinesisFirehoseSqsModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class KinesisFirehoseSqsModel(BaseModel):
    invocationId: str
    deliveryStreamArn: str
    region: str
    sourceKinesisStreamArn: Optional[str] = None
    records: List[KinesisFirehoseSqsRecord]

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var deliveryStreamArn : str
var invocationId : str
var records : List[KinesisFirehoseSqsRecord]
var region : str
var sourceKinesisStreamArn : Optional[str]
class KinesisFirehoseSqsRecord (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class KinesisFirehoseSqsRecord(BaseModel):
    data: SqsRecordModel
    recordId: str
    approximateArrivalTimestamp: PositiveInt
    kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None

    @validator("data", pre=True, allow_reuse=True)
    def data_base64_decode(cls, value):
        # Firehose payload is encoded
        return json.loads(base64_decode(value))

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var approximateArrivalTimestamp : pydantic.types.PositiveInt
var dataSqsRecordModel
var kinesisRecordMetadata : Optional[KinesisFirehoseRecordMetadata]
var recordId : str

Static methods

def data_base64_decode(value)
Expand source code
@validator("data", pre=True, allow_reuse=True)
def data_base64_decode(cls, value):
    # Firehose payload is encoded
    return json.loads(base64_decode(value))