Module aws_lambda_powertools.utilities.streaming.transformations

Sub-modules

aws_lambda_powertools.utilities.streaming.transformations.base
aws_lambda_powertools.utilities.streaming.transformations.csv
aws_lambda_powertools.utilities.streaming.transformations.gzip
aws_lambda_powertools.utilities.streaming.transformations.zip

Classes

class BaseTransform (**transform_options)

BaseTransform is the base class all data transformations need to implement.

Parameters

transform_options : dict, optional
Dictionary of options that can be passed to the underlying transformation to customize the behavior.
Expand source code
class BaseTransform(Generic[T]):
    """
    BaseTransform is the base class all data transformations need to implement.

    Parameters
    ----------
    transform_options: dict, optional
        Dictionary of options that can be passed to the underlying transformation to customize the behavior.
    """

    def __init__(self, **transform_options):
        self.transform_options = transform_options

    @abstractmethod
    def transform(self, input_stream: IO[bytes]) -> T:
        """
        Transforms the data from input_stream into an implementation of IO[bytes].

        This allows you to return your own object while still conforming to a protocol
        that allows transformations to be nested.
        """
        pass

Ancestors

  • typing.Generic

Subclasses

Methods

def transform(self, input_stream: IO[bytes]) ‑> ~T

Transforms the data from input_stream into an implementation of IO[bytes].

This allows you to return your own object while still conforming to a protocol that allows transformations to be nested.

class CsvTransform (**transform_options)

CSV data transform.

Returns a csv.DictReader that reads data from the input stream: https://docs.python.org/3/library/csv.html#csv.DictReader

Example

>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> csv_reader = s3object.transform(CsvTransform())
>>> for row in csv_reader:
>>>   print(row)

Since the underlying stream of bytes needs to be converted into a stream of characters (Iterator[str]), we wrap the input into an io.TextIOWrapper. This means you have control over the text encoding and line termination options.

>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> csv_reader = s3object.transform(CsvTransform(encoding="utf-8", newline="\r\n"))
>>> for row in csv_reader:
>>>   print(row)

Additional options passed on the constructor, will be pased to the csv.DictReader constructor.

>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> csv_reader = s3object.transform(CsvTransform(dialect="excel"))
>>> for row in csv_reader:
>>>   print(row)
Expand source code
class CsvTransform(BaseTransform):
    """
    CSV data transform.

    Returns a csv.DictReader that reads data from the input stream:
    https://docs.python.org/3/library/csv.html#csv.DictReader

    Example
    -------

        >>> from aws_lambda_powertools.utilities.streaming import S3Object
        >>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform
        >>>
        >>> s3object = S3Object(bucket="bucket", key="key")
        >>> csv_reader = s3object.transform(CsvTransform())
        >>> for row in csv_reader:
        >>>   print(row)

    Since the underlying stream of bytes needs to be converted into a stream of characters (Iterator[str]),
    we wrap the input into an io.TextIOWrapper. This means you have control over the text encoding
    and line termination options.

        >>> from aws_lambda_powertools.utilities.streaming import S3Object
        >>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform
        >>>
        >>> s3object = S3Object(bucket="bucket", key="key")
        >>> csv_reader = s3object.transform(CsvTransform(encoding="utf-8", newline="\\r\\n"))
        >>> for row in csv_reader:
        >>>   print(row)

    Additional options passed on the constructor, will be pased to the csv.DictReader constructor.

        >>> from aws_lambda_powertools.utilities.streaming import S3Object
        >>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform
        >>>
        >>> s3object = S3Object(bucket="bucket", key="key")
        >>> csv_reader = s3object.transform(CsvTransform(dialect="excel"))
        >>> for row in csv_reader:
        >>>   print(row)
    """

    def transform(self, input_stream: IO[bytes]) -> DictReader:
        encoding = self.transform_options.pop("encoding", "utf-8")
        newline = self.transform_options.pop("newline", None)

        # csv module needs an Iterator[str], so we wrap the underlying stream into a TextIO
        iterator = io.TextIOWrapper(input_stream, encoding=encoding, newline=newline)
        return csv.DictReader(iterator, **self.transform_options)

Ancestors

Inherited members

class GzipTransform (**transform_options)

Gzip data transform.

Returns a gzip.GzipFile instead that reads data from the input stream: https://docs.python.org/3/library/gzip.html#gzip.GzipFile

Example

>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import GzipTransform
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> reader = s3object.transform(GzipTransform())
>>> for line in reader:
>>>   print(line)
Expand source code
class GzipTransform(BaseTransform):
    """
    Gzip data transform.

    Returns a gzip.GzipFile instead that reads data from the input stream:
    https://docs.python.org/3/library/gzip.html#gzip.GzipFile

    Example
    -------

        >>> from aws_lambda_powertools.utilities.streaming import S3Object
        >>> from aws_lambda_powertools.utilities.streaming.transformations import GzipTransform
        >>>
        >>> s3object = S3Object(bucket="bucket", key="key")
        >>> reader = s3object.transform(GzipTransform())
        >>> for line in reader:
        >>>   print(line)

    """

    def transform(self, input_stream: IO[bytes]) -> GzipFile:
        return GzipFile(fileobj=input_stream, mode="rb", **self.transform_options)

Ancestors

Inherited members

class ZipTransform (**transform_options)

Zip data transform.

Returns a zip.ZipFile that reads data from the input stream: https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile

Example

>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> zip_reader = s3object.transform(ZipTransform())
>>> for file in zip_reader.namelist():
>>>   print(file)
>>>   zip_reader.extract(file)

Additional options passed on the constructor, will be pased to the is_csv.DictReader constructor.

>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform
>>> import zipfile
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> zip_reader = s3object.transform(ZipTransform(compression=zipfile.ZIP_LZMA))
>>> for file in zip_reader.namelist():
>>>   print(file)
>>>   zip_reader.extract(file)

Currently, it's not possible to pipe the Zip file stream into another data transformation, since a Zip file contains multiple files, and not a single stream. However, you can still open a specific file as a stream, reading only the necessary bytes to extract it:

>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform
>>> import zipfile
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> zip_reader = s3object.transform(ZipTransform())
>>> with zip_reader.open("filename.txt") as f:
>>>   for line in f:
>>>      print(line)
Expand source code
class ZipTransform(BaseTransform):
    """
    Zip data transform.

    Returns a zip.ZipFile that reads data from the input stream:
    https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile

    Example
    -------

        >>> from aws_lambda_powertools.utilities.streaming import S3Object
        >>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform
        >>>
        >>> s3object = S3Object(bucket="bucket", key="key")
        >>> zip_reader = s3object.transform(ZipTransform())
        >>> for file in zip_reader.namelist():
        >>>   print(file)
        >>>   zip_reader.extract(file)

    Additional options passed on the constructor, will be pased to the is_csv.DictReader constructor.

        >>> from aws_lambda_powertools.utilities.streaming import S3Object
        >>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform
        >>> import zipfile
        >>>
        >>> s3object = S3Object(bucket="bucket", key="key")
        >>> zip_reader = s3object.transform(ZipTransform(compression=zipfile.ZIP_LZMA))
        >>> for file in zip_reader.namelist():
        >>>   print(file)
        >>>   zip_reader.extract(file)

    Currently, it's not possible to pipe the Zip file stream into another data transformation,
    since a Zip file contains multiple files, and not a single stream. However, you can still
    open a specific file as a stream, reading only the necessary bytes to extract it:

        >>> from aws_lambda_powertools.utilities.streaming import S3Object
        >>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform
        >>> import zipfile
        >>>
        >>> s3object = S3Object(bucket="bucket", key="key")
        >>> zip_reader = s3object.transform(ZipTransform())
        >>> with zip_reader.open("filename.txt") as f:
        >>>   for line in f:
        >>>      print(line)
    """

    def transform(self, input_stream: IO[bytes]) -> ZipFile:
        return ZipFile(input_stream, mode="r", **self.transform_options)

Ancestors

Inherited members