Module aws_lambda_powertools.utilities.streaming.transformations.base
Expand source code
from abc import abstractmethod
from typing import IO, Generic, TypeVar
T = TypeVar("T", bound=IO[bytes])
class BaseTransform(Generic[T]):
"""
BaseTransform is the base class all data transformations need to implement.
Parameters
----------
transform_options: dict, optional
Dictionary of options that can be passed to the underlying transformation to customize the behavior.
"""
def __init__(self, **transform_options):
self.transform_options = transform_options
@abstractmethod
def transform(self, input_stream: IO[bytes]) -> T:
"""
Transforms the data from input_stream into an implementation of IO[bytes].
This allows you to return your own object while still conforming to a protocol
that allows transformations to be nested.
"""
pass
Classes
class BaseTransform (**transform_options)
-
BaseTransform is the base class all data transformations need to implement.
Parameters
transform_options
:dict
, optional- Dictionary of options that can be passed to the underlying transformation to customize the behavior.
Expand source code
class BaseTransform(Generic[T]): """ BaseTransform is the base class all data transformations need to implement. Parameters ---------- transform_options: dict, optional Dictionary of options that can be passed to the underlying transformation to customize the behavior. """ def __init__(self, **transform_options): self.transform_options = transform_options @abstractmethod def transform(self, input_stream: IO[bytes]) -> T: """ Transforms the data from input_stream into an implementation of IO[bytes]. This allows you to return your own object while still conforming to a protocol that allows transformations to be nested. """ pass
Ancestors
- typing.Generic
Subclasses
Methods
def transform(self, input_stream: IO[bytes]) ‑> ~T
-
Transforms the data from input_stream into an implementation of IO[bytes].
This allows you to return your own object while still conforming to a protocol that allows transformations to be nested.
Expand source code
@abstractmethod def transform(self, input_stream: IO[bytes]) -> T: """ Transforms the data from input_stream into an implementation of IO[bytes]. This allows you to return your own object while still conforming to a protocol that allows transformations to be nested. """ pass