Module aws_lambda_powertools.utilities.parser.models.kinesis
Expand source code
import base64
import logging
from binascii import Error as BinAsciiError
from typing import List
from pydantic import BaseModel, validator
from pydantic.types import PositiveInt
from ..types import Literal
logger = logging.getLogger(__name__)
class KinesisDataStreamRecordPayload(BaseModel):
kinesisSchemaVersion: str
partitionKey: str
sequenceNumber: PositiveInt
data: bytes # base64 encoded str is parsed into bytes
approximateArrivalTimestamp: float
@validator("data", pre=True)
def data_base64_decode(cls, value):
try:
logger.debug("Decoding base64 Kinesis data record before parsing")
return base64.b64decode(value)
except (BinAsciiError, TypeError):
raise ValueError("base64 decode failed")
class KinesisDataStreamRecord(BaseModel):
eventSource: Literal["aws:kinesis"]
eventVersion: str
eventID: str
eventName: Literal["aws:kinesis:record"]
invokeIdentityArn: str
awsRegion: str
eventSourceARN: str
kinesis: KinesisDataStreamRecordPayload
class KinesisDataStreamModel(BaseModel):
Records: List[KinesisDataStreamRecord]
Classes
class KinesisDataStreamModel (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisDataStreamModel(BaseModel): Records: List[KinesisDataStreamRecord]
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Records : List[KinesisDataStreamRecord]
class KinesisDataStreamRecord (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisDataStreamRecord(BaseModel): eventSource: Literal["aws:kinesis"] eventVersion: str eventID: str eventName: Literal["aws:kinesis:record"] invokeIdentityArn: str awsRegion: str eventSourceARN: str kinesis: KinesisDataStreamRecordPayload
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var awsRegion : str
var eventID : str
var eventName : Literal['aws:kinesis:record']
var eventSource : Literal['aws:kinesis']
var eventSourceARN : str
var eventVersion : str
var invokeIdentityArn : str
var kinesis : KinesisDataStreamRecordPayload
class KinesisDataStreamRecordPayload (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisDataStreamRecordPayload(BaseModel): kinesisSchemaVersion: str partitionKey: str sequenceNumber: PositiveInt data: bytes # base64 encoded str is parsed into bytes approximateArrivalTimestamp: float @validator("data", pre=True) def data_base64_decode(cls, value): try: logger.debug("Decoding base64 Kinesis data record before parsing") return base64.b64decode(value) except (BinAsciiError, TypeError): raise ValueError("base64 decode failed")
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var approximateArrivalTimestamp : float
var data : bytes
var kinesisSchemaVersion : str
var partitionKey : str
var sequenceNumber : pydantic.types.PositiveInt
Static methods
def data_base64_decode(value)
-
Expand source code
@validator("data", pre=True) def data_base64_decode(cls, value): try: logger.debug("Decoding base64 Kinesis data record before parsing") return base64.b64decode(value) except (BinAsciiError, TypeError): raise ValueError("base64 decode failed")