About: A Full Stack Developer specializes in Python (Django, Flask), Go, & JavaScript (Angular, Node.js). Experience designing, planning, and building complete web applications with backend API systems.
Location:
Toronto, Canada
Joined:
Jan 22, 2019
Efficiently Streaming a Large AWS S3 File via S3 Select
Publish Date: Apr 6 '21
56 4
AWS S3 is an industry-leading object storage service. We tend to store lots of data files on S3 and at times require processing these files. If the size of the file that we are processing is small, we can basically go with traditional file processing flow, wherein we fetch the file from S3 and then process it row by row level. But the question arises, what if the file is size is more viz. > 1GB? 😓
Importing (reading) a large file leads Out of Memory error. It can also lead to a system crash event. There are libraries viz. Pandas, Dask, etc. which are very good at processing large files but again the file is to be present locally i.e. we will have to import it from S3 to our local machine. But what if we do not want to fetch and store the whole S3 file locally? 🤔
📜 Let's consider some of the use-cases:
We want to process a large CSV S3 file (~2GB) every day. It must be processed within a certain time frame (e.g. in 4 hours)
We are required to process large S3 files regularly from the FTP server. New files come in certain time intervals and to be processed sequentially i.e. the old file has to be processed before starting to process the newer files.
These are some very good scenarios where local processing may impact the overall flow of the system. Also, if we are running these file processing units in containers, then we have got limited disk space to work with. Hence, a cloud streaming flow is needed (which can also parallelize the processing of multiple chunks of the same file by streaming different chunks of the same file in parallel threads/processes). This is where I came across the AWS S3 Select feature. 😎
📝 This post focuses on streaming a large file into smaller manageable chunks (sequentially). This approach can then be used to parallelize the processing by running in concurrent threads/processes. Check my next post on this.
S3 Select
With Amazon S3 Select, you can use simple structured query language (SQL) statements to filter the contents of Amazon S3 objects and retrieve just the subset of data that you need. Using Amazon S3 Select to filter this data, you can reduce the amount of data that Amazon S3 transfers, reducing the cost and latency to retrieve this data.
Amazon S3 Select works on objects stored in CSV, JSON, or Apache Parquet format. It also works with objects that are compressed with GZIP or BZIP2 (for CSV and JSON objects only) and server-side encrypted objects. You can specify the format of the results as either CSV or JSON, and you can determine how the records in the result are delimited.
📝 We will be using Python's boto3 to accomplish our end goal.
response=s3_client.select_object_content(Bucket=bucket,Key=key,ExpressionType='SQL',Expression='SELECT * FROM S3Object',InputSerialization={'CSV':{'FileHeaderInfo':'USE','FieldDelimiter':',','RecordDelimiter':'\n'}},OutputSerialization={'JSON':{'RecordDelimiter':','}})
In above request, InputSerialization determines the S3 file type and related properties, while OutputSerialization determines the response that we get out of this select_object_content().
🌫️ Streaming Chunks
Now, as we have got some idea about how the S3 Select works, let's try to accomplish our use-case of streaming chunks (subset) of a large file like how a paginated API works. 😋
S3 Select supports ScanRange parameter which helps us to stream a subset of an object by specifying a range of bytes to query. S3 Select requests for a series of non-overlapping scan ranges. Scan ranges don't need to be aligned with record boundaries. A record that starts within the scan range specified but extends beyond the scan range will be processed by the query. It means that the row would be fetched within the scan range and it might extend to fetch the whole row. It doesn't fetch a subset of a row, either the whole row is fetched or it is skipped (to be fetched in another scan range).
Let's try to achieve this in 2 simple steps:
1. Find the total bytes of the S3 file
The following code snippet showcases the function that will perform a HEAD request on our S3 file and determines the file size in bytes.
defget_s3_file_size(bucket:str,key:str)->int:"""Gets the file size of S3 object by a HEAD request
Args:
bucket (str): S3 bucket
key (str): S3 object path
Returns:
int: File size in bytes. Defaults to 0 if any error.
"""aws_profile=current_app.config.get('AWS_PROFILE_NAME')s3_client=boto3.session.Session(profile_name=aws_profile).client('s3')file_size=0try:response=s3_client.head_object(Bucket=bucket,Key=key)ifresponse:file_size=int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))exceptClientError:logger.exception(f'Client error reading S3 file {bucket} : {key}')returnfile_size
2. Create a generator to stream the chunks
Now, the logic is to yield the chunks of byte stream of the S3 file until we reach the file size. Rest assured, this continuous scan range won't result in over-lapping of rows in the response 😉 (check the output image / GitHub repo). Simple enough, eh? 😝
importastimportboto3frombotocore.exceptionsimportClientErrordefstream_s3_file(bucket:str,key:str,file_size:int,chunk_bytes=5000)->tuple[dict]:"""Streams a S3 file via a generator.
Args:
bucket (str): S3 bucket
key (str): S3 object path
chunk_bytes (int): Chunk size in bytes. Defaults to 5000
Returns:
tuple[dict]: Returns a tuple of dictionary containing rows of file content
"""aws_profile=current_app.config.get('AWS_PROFILE_NAME')s3_client=boto3.session.Session(profile_name=aws_profile).client('s3')expression='SELECT * FROM S3Object'start_range=0end_range=min(chunk_bytes,file_size)whilestart_range<file_size:response=s3_client.select_object_content(Bucket=bucket,Key=key,ExpressionType='SQL',Expression=expression,InputSerialization={'CSV':{'FileHeaderInfo':'USE','FieldDelimiter':',','RecordDelimiter':'\n'}},OutputSerialization={'JSON':{'RecordDelimiter':','}},ScanRange={'Start':start_range,'End':end_range},)"""
select_object_content() response is an event stream that can be looped to concatenate the overall result set
Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
"""result_stream=[]foreventinresponse['Payload']:ifrecords:=event.get('Records'):result_stream.append(records['Payload'].decode('utf-8'))yieldast.literal_eval(''.join(result_stream))start_range=end_rangeend_range=end_range+min(chunk_bytes,file_size-end_range)defs3_file_processing():bucket='<s3-bucket>'key='<s3-key>'file_size=get_s3_file_size(bucket=bucket,key=key)logger.debug(f'Initiating streaming file of {file_size} bytes')chunk_size=524288# 512KB or 0.5MB
forfile_chunkinstream_s3_file(bucket=bucket,key=key,file_size=file_size,chunk_bytes=chunk_size):logger.info(f'\n{30*"*"} New chunk {30*"*"}')id_set=set()forrowinfile_chunk:# perform any other processing here
id_set.add(int(row.get('id')))logger.info(f'{min(id_set)} --> {max(id_set)}')
Congratulations! 👏 We have successfully managed to solve one of the key challenges of processing a large S3 file without crashing our system. 🤘
📌 You can check out my GitHub repository for a complete working example of this approach 👇
This project showcases the rich AWS S3 Select feature to stream a large data file in a paginated style.
AWS S3 Select Demo
This project showcases the rich AWS S3 Select feature to stream a large data file in a paginated style.
Currently, S3 Select does not support OFFSET and hence we cannot paginate the results of the query. Hence, we use scanrange feature to stream the contents of the S3 file.
Background
Importing (reading) a large file leads Out of Memory error. It can also lead to a system crash event. There are libraries viz. Pandas, Dask, etc. which are very good at processing large files but again the file is to be present locally i.e. we will have to import it from S3 to our local machine. But what if we do not want to fetch and store the whole S3 file locally at once? 🤔
Well, we can make use of AWS S3 Select to stream a large file via it's ScanRange parameter. This approach…
Hi Idris, Great post! I tried to do a similar code where I select all data from a s3 file and recreated this same file locally with the same exact format. But after building the file I noticed that the local file had fewer records than the real one. As I increased the chunk size of the scan range the difference between the s3 file and the local file diminished. Do you have any idea why this might happen? Obs: file format: CSV , No compression, 5000 and 20000 bytes chunk range used for the tests. Once again, thank you for the post.
Hi,
Glad that you liked the post and it helped you in your use-case.
With this process of streaming the data, you have to keep retrieving the file chunk from S3 until you reach the total file size. I would recommend to clone this repo and compare with your local code to identify if you missed something 😉
Optionally, I would recommend to also check out the sequel to this post for parallel processing 😁
Thank you! I found out what I was missing, I made the start_byte = end_byte + 1. Losing one row per chunk. Your next article was exact what I was looking for for the next step of my program.
Hi Idris, Great post! I tried to do a similar code where I select all data from a s3 file and recreated this same file locally with the same exact format. But after building the file I noticed that the local file had fewer records than the real one. As I increased the chunk size of the scan range the difference between the s3 file and the local file diminished. Do you have any idea why this might happen? Obs: file format: CSV , No compression, 5000 and 20000 bytes chunk range used for the tests. Once again, thank you for the post.