Module aws_lambda_powertools.utilities.parser.models.kinesis

Expand source code
import base64
import logging
from binascii import Error as BinAsciiError
from typing import List

from pydantic import BaseModel, validator
from pydantic.types import PositiveInt

from ..types import Literal

logger = logging.getLogger(__name__)


class KinesisDataStreamRecordPayload(BaseModel):
    kinesisSchemaVersion: str
    partitionKey: str
    sequenceNumber: PositiveInt
    data: bytes  # base64 encoded str is parsed into bytes
    approximateArrivalTimestamp: float

    @validator("data", pre=True)
    def data_base64_decode(cls, value):
        try:
            logger.debug("Decoding base64 Kinesis data record before parsing")
            return base64.b64decode(value)
        except (BinAsciiError, TypeError):
            raise ValueError("base64 decode failed")


class KinesisDataStreamRecord(BaseModel):
    eventSource: Literal["aws:kinesis"]
    eventVersion: str
    eventID: str
    eventName: Literal["aws:kinesis:record"]
    invokeIdentityArn: str
    awsRegion: str
    eventSourceARN: str
    kinesis: KinesisDataStreamRecordPayload


class KinesisDataStreamModel(BaseModel):
    Records: List[KinesisDataStreamRecord]

Classes

class KinesisDataStreamModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class KinesisDataStreamModel(BaseModel):
    Records: List[KinesisDataStreamRecord]

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var Records : List[KinesisDataStreamRecord]
class KinesisDataStreamRecord (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class KinesisDataStreamRecord(BaseModel):
    eventSource: Literal["aws:kinesis"]
    eventVersion: str
    eventID: str
    eventName: Literal["aws:kinesis:record"]
    invokeIdentityArn: str
    awsRegion: str
    eventSourceARN: str
    kinesis: KinesisDataStreamRecordPayload

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var awsRegion : str
var eventID : str
var eventName : Literal['aws:kinesis:record']
var eventSource : Literal['aws:kinesis']
var eventSourceARN : str
var eventVersion : str
var invokeIdentityArn : str
var kinesisKinesisDataStreamRecordPayload
class KinesisDataStreamRecordPayload (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class KinesisDataStreamRecordPayload(BaseModel):
    kinesisSchemaVersion: str
    partitionKey: str
    sequenceNumber: PositiveInt
    data: bytes  # base64 encoded str is parsed into bytes
    approximateArrivalTimestamp: float

    @validator("data", pre=True)
    def data_base64_decode(cls, value):
        try:
            logger.debug("Decoding base64 Kinesis data record before parsing")
            return base64.b64decode(value)
        except (BinAsciiError, TypeError):
            raise ValueError("base64 decode failed")

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var approximateArrivalTimestamp : float
var data : bytes
var kinesisSchemaVersion : str
var partitionKey : str
var sequenceNumber : pydantic.types.PositiveInt

Static methods

def data_base64_decode(value)
Expand source code
@validator("data", pre=True)
def data_base64_decode(cls, value):
    try:
        logger.debug("Decoding base64 Kinesis data record before parsing")
        return base64.b64decode(value)
    except (BinAsciiError, TypeError):
        raise ValueError("base64 decode failed")