Module aws_lambda_powertools.utilities.parser.models.kinesis_firehose

Expand source code
from typing import List, Optional, Type, Union

from pydantic import BaseModel, PositiveInt, validator

from aws_lambda_powertools.shared.functions import base64_decode


class KinesisFirehoseRecordMetadata(BaseModel):
    shardId: str
    partitionKey: str
    approximateArrivalTimestamp: PositiveInt
    sequenceNumber: str
    subsequenceNumber: int


class KinesisFirehoseRecord(BaseModel):
    data: Union[bytes, Type[BaseModel]]  # base64 encoded str is parsed into bytes
    recordId: str
    approximateArrivalTimestamp: PositiveInt
    kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None

    @validator("data", pre=True, allow_reuse=True)
    def data_base64_decode(cls, value):
        return base64_decode(value)


class KinesisFirehoseModel(BaseModel):
    invocationId: str
    deliveryStreamArn: str
    region: str
    sourceKinesisStreamArn: Optional[str] = None
    records: List[KinesisFirehoseRecord]

Classes

class KinesisFirehoseModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class KinesisFirehoseModel(BaseModel):
    invocationId: str
    deliveryStreamArn: str
    region: str
    sourceKinesisStreamArn: Optional[str] = None
    records: List[KinesisFirehoseRecord]

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var deliveryStreamArn : str
var invocationId : str
var records : List[KinesisFirehoseRecord]
var region : str
var sourceKinesisStreamArn : Optional[str]
class KinesisFirehoseRecord (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class KinesisFirehoseRecord(BaseModel):
    data: Union[bytes, Type[BaseModel]]  # base64 encoded str is parsed into bytes
    recordId: str
    approximateArrivalTimestamp: PositiveInt
    kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None

    @validator("data", pre=True, allow_reuse=True)
    def data_base64_decode(cls, value):
        return base64_decode(value)

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var approximateArrivalTimestamp : pydantic.types.PositiveInt
var data : Union[bytes, Type[pydantic.main.BaseModel]]
var kinesisRecordMetadata : Optional[KinesisFirehoseRecordMetadata]
var recordId : str

Static methods

def data_base64_decode(value)
Expand source code
@validator("data", pre=True, allow_reuse=True)
def data_base64_decode(cls, value):
    return base64_decode(value)
class KinesisFirehoseRecordMetadata (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class KinesisFirehoseRecordMetadata(BaseModel):
    shardId: str
    partitionKey: str
    approximateArrivalTimestamp: PositiveInt
    sequenceNumber: str
    subsequenceNumber: int

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var approximateArrivalTimestamp : pydantic.types.PositiveInt
var partitionKey : str
var sequenceNumber : str
var shardId : str
var subsequenceNumber : int