Module aws_lambda_powertools.utilities.streaming.transformations
Expand source code
from aws_lambda_powertools.utilities.streaming.transformations.base import BaseTransform
from aws_lambda_powertools.utilities.streaming.transformations.csv import CsvTransform
from aws_lambda_powertools.utilities.streaming.transformations.gzip import GzipTransform
from aws_lambda_powertools.utilities.streaming.transformations.zip import ZipTransform
__all__ = ["BaseTransform", "GzipTransform", "ZipTransform", "CsvTransform"]
Sub-modules
aws_lambda_powertools.utilities.streaming.transformations.base
aws_lambda_powertools.utilities.streaming.transformations.csv
aws_lambda_powertools.utilities.streaming.transformations.gzip
aws_lambda_powertools.utilities.streaming.transformations.zip
Classes
class BaseTransform (**transform_options)
-
BaseTransform is the base class all data transformations need to implement.
Parameters
transform_options
:dict
, optional- Dictionary of options that can be passed to the underlying transformation to customize the behavior.
Expand source code
class BaseTransform(Generic[T]): """ BaseTransform is the base class all data transformations need to implement. Parameters ---------- transform_options: dict, optional Dictionary of options that can be passed to the underlying transformation to customize the behavior. """ def __init__(self, **transform_options): self.transform_options = transform_options @abstractmethod def transform(self, input_stream: IO[bytes]) -> T: """ Transforms the data from input_stream into an implementation of IO[bytes]. This allows you to return your own object while still conforming to a protocol that allows transformations to be nested. """ pass
Ancestors
- typing.Generic
Subclasses
Methods
def transform(self, input_stream: IO[bytes]) ‑> ~T
-
Transforms the data from input_stream into an implementation of IO[bytes].
This allows you to return your own object while still conforming to a protocol that allows transformations to be nested.
Expand source code
@abstractmethod def transform(self, input_stream: IO[bytes]) -> T: """ Transforms the data from input_stream into an implementation of IO[bytes]. This allows you to return your own object while still conforming to a protocol that allows transformations to be nested. """ pass
class CsvTransform (**transform_options)
-
CSV data transform.
Returns a csv.DictReader that reads data from the input stream: https://docs.python.org/3/library/csv.html#csv.DictReader
Example
>>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> csv_reader = s3object.transform(CsvTransform()) >>> for row in csv_reader: >>> print(row)
Since the underlying stream of bytes needs to be converted into a stream of characters (Iterator[str]), we wrap the input into an io.TextIOWrapper. This means you have control over the text encoding and line termination options.
>>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> csv_reader = s3object.transform(CsvTransform(encoding="utf-8", newline="\r\n")) >>> for row in csv_reader: >>> print(row)
Additional options passed on the constructor, will be pased to the csv.DictReader constructor.
>>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> csv_reader = s3object.transform(CsvTransform(dialect="excel")) >>> for row in csv_reader: >>> print(row)
Expand source code
class CsvTransform(BaseTransform): """ CSV data transform. Returns a csv.DictReader that reads data from the input stream: https://docs.python.org/3/library/csv.html#csv.DictReader Example ------- >>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> csv_reader = s3object.transform(CsvTransform()) >>> for row in csv_reader: >>> print(row) Since the underlying stream of bytes needs to be converted into a stream of characters (Iterator[str]), we wrap the input into an io.TextIOWrapper. This means you have control over the text encoding and line termination options. >>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> csv_reader = s3object.transform(CsvTransform(encoding="utf-8", newline="\\r\\n")) >>> for row in csv_reader: >>> print(row) Additional options passed on the constructor, will be pased to the csv.DictReader constructor. >>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> csv_reader = s3object.transform(CsvTransform(dialect="excel")) >>> for row in csv_reader: >>> print(row) """ def transform(self, input_stream: IO[bytes]) -> DictReader: encoding = self.transform_options.pop("encoding", "utf-8") newline = self.transform_options.pop("newline", None) # csv module needs an Iterator[str], so we wrap the underlying stream into a TextIO iterator = io.TextIOWrapper(input_stream, encoding=encoding, newline=newline) return csv.DictReader(iterator, **self.transform_options)
Ancestors
- BaseTransform
- typing.Generic
Inherited members
class GzipTransform (**transform_options)
-
Gzip data transform.
Returns a gzip.GzipFile instead that reads data from the input stream: https://docs.python.org/3/library/gzip.html#gzip.GzipFile
Example
>>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import GzipTransform >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> reader = s3object.transform(GzipTransform()) >>> for line in reader: >>> print(line)
Expand source code
class GzipTransform(BaseTransform): """ Gzip data transform. Returns a gzip.GzipFile instead that reads data from the input stream: https://docs.python.org/3/library/gzip.html#gzip.GzipFile Example ------- >>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import GzipTransform >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> reader = s3object.transform(GzipTransform()) >>> for line in reader: >>> print(line) """ def transform(self, input_stream: IO[bytes]) -> GzipFile: return GzipFile(fileobj=input_stream, mode="rb", **self.transform_options)
Ancestors
- BaseTransform
- typing.Generic
Inherited members
class ZipTransform (**transform_options)
-
Zip data transform.
Returns a zip.ZipFile that reads data from the input stream: https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile
Example
>>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> zip_reader = s3object.transform(ZipTransform()) >>> for file in zip_reader.namelist(): >>> print(file) >>> zip_reader.extract(file)
Additional options passed on the constructor, will be pased to the is_csv.DictReader constructor.
>>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform >>> import zipfile >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> zip_reader = s3object.transform(ZipTransform(compression=zipfile.ZIP_LZMA)) >>> for file in zip_reader.namelist(): >>> print(file) >>> zip_reader.extract(file)
Currently, it's not possible to pipe the Zip file stream into another data transformation, since a Zip file contains multiple files, and not a single stream. However, you can still open a specific file as a stream, reading only the necessary bytes to extract it:
>>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform >>> import zipfile >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> zip_reader = s3object.transform(ZipTransform()) >>> with zip_reader.open("filename.txt") as f: >>> for line in f: >>> print(line)
Expand source code
class ZipTransform(BaseTransform): """ Zip data transform. Returns a zip.ZipFile that reads data from the input stream: https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile Example ------- >>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> zip_reader = s3object.transform(ZipTransform()) >>> for file in zip_reader.namelist(): >>> print(file) >>> zip_reader.extract(file) Additional options passed on the constructor, will be pased to the is_csv.DictReader constructor. >>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform >>> import zipfile >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> zip_reader = s3object.transform(ZipTransform(compression=zipfile.ZIP_LZMA)) >>> for file in zip_reader.namelist(): >>> print(file) >>> zip_reader.extract(file) Currently, it's not possible to pipe the Zip file stream into another data transformation, since a Zip file contains multiple files, and not a single stream. However, you can still open a specific file as a stream, reading only the necessary bytes to extract it: >>> from aws_lambda_powertools.utilities.streaming import S3Object >>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform >>> import zipfile >>> >>> s3object = S3Object(bucket="bucket", key="key") >>> zip_reader = s3object.transform(ZipTransform()) >>> with zip_reader.open("filename.txt") as f: >>> for line in f: >>> print(line) """ def transform(self, input_stream: IO[bytes]) -> ZipFile: return ZipFile(input_stream, mode="r", **self.transform_options)
Ancestors
- BaseTransform
- typing.Generic
Inherited members