Within Lambda, processing S3 objects larger than the allocated amount of memory can lead to out of memory or timeout situations. For cost efficiency, your S3 objects may be encoded and compressed in various formats (gzip, CSV, zip files, etc), increasing the amount of non-business logic and reliability risks.
Streaming utility makes this process easier by fetching parts of your data as you consume it, and transparently applying data transformations to the data stream. This allows you to process one, a few, or all rows of your large dataset while consuming a few MBs only.
Alternatively, you can apply transformations later via the transform method. By default, it will return the transformed stream you can use to read its contents. If you prefer in-place modifications, use in_place=True.
When is this useful?
In scenarios where you might have a reusable logic to apply common transformations. This might be a function or a class that receives an instance of S3Object.
1 2 3 4 5 6 7 8 9101112131415
fromtypingimportDictfromaws_lambda_powertools.utilities.streaming.s3_objectimportS3Objectfromaws_lambda_powertools.utilities.streaming.transformationsimport(CsvTransform,GzipTransform,)fromaws_lambda_powertools.utilities.typingimportLambdaContextdeflambda_handler(event:Dict[str,str],context:LambdaContext):s3=S3Object(bucket=event["bucket"],key=event["key"])data=s3.transform([GzipTransform(),CsvTransform()])forlineindata:print(line)# returns a dict
Note that when using in_place=True, there is no return (None).
1 2 3 4 5 6 7 8 9101112131415
fromtypingimportDictfromaws_lambda_powertools.utilities.streaming.s3_objectimportS3Objectfromaws_lambda_powertools.utilities.streaming.transformationsimport(CsvTransform,GzipTransform,)fromaws_lambda_powertools.utilities.typingimportLambdaContextdeflambda_handler(event:Dict[str,str],context:LambdaContext):s3=S3Object(bucket=event["bucket"],key=event["key"])s3.transform([GzipTransform(),CsvTransform()],in_place=True)forlineins3:print(line)# returns a dict
S3Object implements Python I/O interface. This means you can use seek to start reading contents of your file from any particular position, saving you processing time.
For example, let's imagine you have a large CSV file, each row has a non-uniform size (bytes), and you want to read and process the last row only.
non_uniform_sample.csv
1234
id,name,location
1,Ruben Fonseca, Denmark
2,Heitor Lessa, Netherlands
3,Leandro Damascena, Portugal
You found out the last row has exactly 30 bytes. We can use seek() to skip to the end of the file, read 30 bytes, then transform to CSV.
Reading only the last CSV row
1 2 3 4 5 6 7 8 9101112131415161718192021222324
importiofromtypingimportDictfromaws_lambda_powertools.utilities.streaming.s3_objectimportS3Objectfromaws_lambda_powertools.utilities.streaming.transformationsimportCsvTransformfromaws_lambda_powertools.utilities.typingimportLambdaContextLAST_ROW_SIZE=30CSV_HEADERS=["id","name","location"]deflambda_handler(event:Dict[str,str],context:LambdaContext):sample_csv=S3Object(bucket=event["bucket"],key="sample.csv")# From the end of the file, jump exactly 30 bytes backwardssample_csv.seek(-LAST_ROW_SIZE,io.SEEK_END)# Transform portion of data into CSV with our headerssample_csv.transform(CsvTransform(fieldnames=CSV_HEADERS),in_place=True)# We will only read the last portion of the file from S3# as we're only interested in the last 'location' from our datasetforlast_rowinsample_csv:print(last_row["location"])
importiofromtypingimportDictfromaws_lambda_powertools.utilities.streaming.s3_objectimportS3Objectfromaws_lambda_powertools.utilities.streaming.transformationsimportCsvTransformfromaws_lambda_powertools.utilities.typingimportLambdaContext"""Assuming the CSV files contains rows after the header always has 8 bytes + 1 byte newline:reading,position,type21.3,5,+23.4,4,+21.3,0,-..."""CSV_HEADERS=["reading","position","type"]ROW_SIZE=8+1# 1 byte newlineHEADER_SIZE=21+1# 1 byte newlineLINES_TO_JUMP=100deflambda_handler(event:Dict[str,str],context:LambdaContext):sample_csv=S3Object(bucket=event["bucket"],key=event["key"])# Skip the header linesample_csv.seek(HEADER_SIZE,io.SEEK_SET)# Jump 100 lines of 9 bytes each (8 bytes of data + 1 byte newline)sample_csv.seek(LINES_TO_JUMP*ROW_SIZE,io.SEEK_CUR)sample_csv.transform(CsvTransform(),in_place=True)forrowinsample_csv:print(row["reading"])
You can build your own custom data transformation by extending the BaseTransform class. The transform method receives an IO[bytes] object, and you are responsible for returning an IO[bytes] object.
importiofromtypingimportIO,Optionalimportijsonfromaws_lambda_powertools.utilities.streaming.transformationsimportBaseTransform# Using io.RawIOBase gets us default implementations of many of the common IO methodsclassJsonDeserializer(io.RawIOBase):def__init__(self,input_stream:IO[bytes]):self.input=ijson.items(input_stream,"",multiple_values=True)defread(self,size:int=-1)->Optional[bytes]:raiseNotImplementedError(f"{__name__} does not implement read")defreadline(self,size:Optional[int]=None)->bytes:raiseNotImplementedError(f"{__name__} does not implement readline")defread_object(self)->dict:returnself.input.__next__()def__next__(self):returnself.read_object()classJsonTransform(BaseTransform):deftransform(self,input_stream:IO[bytes])->JsonDeserializer:returnJsonDeserializer(input_stream=input_stream)
We make multiple API calls to S3 as you read chunks from your S3 object. If your function is decorated with Tracer, you can easily hit AWS X-Ray 64K segment size when processing large files.
Use tracer decorators in parts where you don't read your S3Object instead.