Skip to content

Streaming

CLASS DESCRIPTION
S3Object

Seekable and streamable S3 Object reader.

S3Object

S3Object(
    bucket: str,
    key: str,
    version_id: str | None = None,
    boto3_client: S3Client | None = None,
    is_gzip: bool | None = False,
    is_csv: bool | None = False,
    **sdk_options
)

Bases: IO[bytes]

Seekable and streamable S3 Object reader.

S3Object implements the IO[bytes], backed by a seekable S3 streaming.

PARAMETER DESCRIPTION
bucket

The S3 bucket

TYPE: str

key

The S3 key

TYPE: str

version_id

A version ID of the object, when the S3 bucket is versioned

TYPE: str | None DEFAULT: None

boto3_client

An optional boto3 S3 client. If missing, a new one will be created.

TYPE: S3Client | None DEFAULT: None

is_gzip

Enables the Gunzip data transformation

TYPE: bool | None DEFAULT: False

is_csv

Enables the CSV data transformation

TYPE: bool | None DEFAULT: False

sdk_options

Dictionary of options that will be passed to the S3 Client get_object API call

DEFAULT: {}

Example

Reads a line from an S3, loading as little data as necessary:

1
2
3
4
5
>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>>
>>> line: bytes = S3Object(bucket="bucket", key="key").readline()
>>>
>>> print(line)
METHOD DESCRIPTION
transform

Applies one or more data transformations to the stream.

ATTRIBUTE DESCRIPTION
size

Retrieves the size of the underlying S3 object

TYPE: int

transformed_stream

Returns a IO[bytes] stream with all the data transformations applied in order

TYPE: IO[bytes]

Source code in aws_lambda_powertools/utilities/streaming/s3_object.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self,
    bucket: str,
    key: str,
    version_id: str | None = None,
    boto3_client: S3Client | None = None,
    is_gzip: bool | None = False,
    is_csv: bool | None = False,
    **sdk_options,
):
    self.bucket = bucket
    self.key = key
    self.version_id = version_id

    # The underlying seekable IO, where all the magic happens
    self.raw_stream = _S3SeekableIO(
        bucket=bucket,
        key=key,
        version_id=version_id,
        boto3_client=boto3_client,
        **sdk_options,
    )

    # Stores the list of data transformations
    self._data_transformations: list[BaseTransform] = []
    if is_gzip:
        self._data_transformations.append(GzipTransform())
    if is_csv:
        self._data_transformations.append(CsvTransform())

    # Stores the cached transformed stream
    self._transformed_stream: IO[bytes] | None = None

size property

size: int

Retrieves the size of the underlying S3 object

transformed_stream property

transformed_stream: IO[bytes]

Returns a IO[bytes] stream with all the data transformations applied in order

transform

transform(
    transformations: (
        BaseTransform[T] | Sequence[BaseTransform[T]]
    ),
    in_place: Literal[True],
) -> T
transform(
    transformations: (
        BaseTransform[T] | Sequence[BaseTransform[T]]
    ),
    in_place: Literal[False],
) -> None
transform(
    transformations: (
        BaseTransform[T] | Sequence[BaseTransform[T]]
    ),
) -> T
transform(
    transformations: (
        BaseTransform[T] | Sequence[BaseTransform[T]]
    ),
    in_place: bool | None = False,
) -> T | None

Applies one or more data transformations to the stream.

PARAMETER DESCRIPTION
transformations

One or more transformations to apply. Transformations are applied in the same order as they are declared.

TYPE: BaseTransform[T] | Sequence[BaseTransform[T]]

in_place

Transforms the stream in place, instead of returning a new stream object. Defaults to false.

TYPE: bool | None DEFAULT: False

RETURNS DESCRIPTION
T[bound=IO[bytes]], optional

If in_place is False, returns an IO[bytes] object representing the transformed stream

Source code in aws_lambda_powertools/utilities/streaming/s3_object.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def transform(
    self,
    transformations: BaseTransform[T] | Sequence[BaseTransform[T]],
    in_place: bool | None = False,
) -> T | None:
    """
    Applies one or more data transformations to the stream.

    Parameters
    ----------
    transformations: BaseTransform[T] | Sequence[BaseTransform[T]]
        One or more transformations to apply. Transformations are applied in the same order as they are declared.
    in_place: bool, optional
        Transforms the stream in place, instead of returning a new stream object. Defaults to false.

    Returns
    -------
    T[bound=IO[bytes]], optional
        If in_place is False, returns an IO[bytes] object representing the transformed stream
    """
    # Make transformations always be a sequence to make mypy happy
    if not isinstance(transformations, Sequence):
        transformations = [transformations]

    # Scenario 1: user wants to transform the stream in place.
    # In this case, we store the transformations and invalidate any existing transformed stream.
    # This way, the transformed_stream is re-created on the next IO operation.
    # This can happen when the user calls .transform multiple times before they start reading data
    #
    #   >>> s3object.transform(GzipTransform(), in_place=True)
    #   >>> s3object.seek(0, io.SEEK_SET) <- this creates a transformed stream
    #   >>> s3object.transform(CsvTransform(), in_place=True) <- need to re-create transformed stream
    #   >>> s3object.read...
    if in_place:
        self._data_transformations.extend(transformations)

        # Invalidate any existing transformed stream.
        # It will be created again next time it's accessed.
        self._transformed_stream = None
        return None
    else:
        # Tell mypy that raw_stream actually implements T (bound to IO[bytes])
        stream = cast(T, self.raw_stream)
        for transformation in transformations:
            stream = transformation.transform(stream)
        return stream
CLASS DESCRIPTION
CsvTransform

CSV data transform.

CsvTransform

CsvTransform(**transform_options)

Bases: BaseTransform

CSV data transform.

Returns a csv.DictReader that reads data from the input stream: https://docs.python.org/3/library/csv.html#csv.DictReader

Example
1
2
3
4
5
6
7
>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> csv_reader = s3object.transform(CsvTransform())
>>> for row in csv_reader:
>>>   print(row)

Since the underlying stream of bytes needs to be converted into a stream of characters (Iterator[str]), we wrap the input into an io.TextIOWrapper. This means you have control over the text encoding and line termination options.

1
2
3
4
5
6
7
>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> csv_reader = s3object.transform(CsvTransform(encoding="utf-8", newline="\r\n"))
>>> for row in csv_reader:
>>>   print(row)

Additional options passed on the constructor, will be pased to the csv.DictReader constructor.

1
2
3
4
5
6
7
>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> csv_reader = s3object.transform(CsvTransform(dialect="excel"))
>>> for row in csv_reader:
>>>   print(row)
Source code in aws_lambda_powertools/utilities/streaming/transformations/base.py
17
18
def __init__(self, **transform_options):
    self.transform_options = transform_options
CLASS DESCRIPTION
GzipTransform

Gzip data transform.

GzipTransform

GzipTransform(**transform_options)

Bases: BaseTransform

Gzip data transform.

Returns a gzip.GzipFile instead that reads data from the input stream: https://docs.python.org/3/library/gzip.html#gzip.GzipFile

Example
1
2
3
4
5
6
7
>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import GzipTransform
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> reader = s3object.transform(GzipTransform())
>>> for line in reader:
>>>   print(line)
Source code in aws_lambda_powertools/utilities/streaming/transformations/base.py
17
18
def __init__(self, **transform_options):
    self.transform_options = transform_options
CLASS DESCRIPTION
ZipTransform

Zip data transform.

ZipTransform

ZipTransform(**transform_options)

Bases: BaseTransform

Zip data transform.

Returns a zip.ZipFile that reads data from the input stream: https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile

Example
1
2
3
4
5
6
7
8
>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> zip_reader = s3object.transform(ZipTransform())
>>> for file in zip_reader.namelist():
>>>   print(file)
>>>   zip_reader.extract(file)

Additional options passed on the constructor, will be pased to the is_csv.DictReader constructor.

1
2
3
4
5
6
7
8
9
>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform
>>> import zipfile
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> zip_reader = s3object.transform(ZipTransform(compression=zipfile.ZIP_LZMA))
>>> for file in zip_reader.namelist():
>>>   print(file)
>>>   zip_reader.extract(file)

Currently, it's not possible to pipe the Zip file stream into another data transformation, since a Zip file contains multiple files, and not a single stream. However, you can still open a specific file as a stream, reading only the necessary bytes to extract it:

1
2
3
4
5
6
7
8
9
>>> from aws_lambda_powertools.utilities.streaming import S3Object
>>> from aws_lambda_powertools.utilities.streaming.transformations import ZipTransform
>>> import zipfile
>>>
>>> s3object = S3Object(bucket="bucket", key="key")
>>> zip_reader = s3object.transform(ZipTransform())
>>> with zip_reader.open("filename.txt") as f:
>>>   for line in f:
>>>      print(line)
Source code in aws_lambda_powertools/utilities/streaming/transformations/base.py
17
18
def __init__(self, **transform_options):
    self.transform_options = transform_options