Python and Boto3: Automating S3 Data Operations
Amazon S3 is the foundation of nearly every AWS data architecture โ a durable, infinitely scalable object store that acts as the raw zone, transformed zone, and serving layer of most data lakes. If you are working in data engineering on AWS, you will write Boto3 code to interact with S3 constantly. This tutorial covers the operations that come up most in production: uploading files, listing objects at scale, handling large file uploads correctly, generating time-limited access URLs, and moving objects between buckets.
All examples use real, runnable Python code. Set up your AWS credentials via aws configure or environment variables before running them.
Setting Up Boto3
Install Boto3 and configure your environment:
pip install boto3 botocore
Boto3 provides two interfaces: boto3.client gives you a low-level interface that maps directly to AWS API calls, while boto3.resource provides a higher-level object-oriented abstraction. Both are useful โ the client is more explicit and better for operations that need fine-grained control; the resource is more readable for simple operations.
import boto3
from botocore.exceptions import ClientError
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Client interface โ explicit, low-level
s3_client = boto3.client("s3", region_name="ca-central-1")
# Resource interface โ higher-level, object-oriented
s3_resource = boto3.resource("s3", region_name="ca-central-1")
Always specify a region. Defaulting to us-east-1 creates subtle bugs when your bucket is in ca-central-1 and path-style URLs behave differently.
Uploading Files to S3
For files under ~100 MB, upload_file and put_object both work well. The key difference: upload_file streams from disk (memory-efficient), while put_object requires the content in memory.
def upload_file(local_path: str, bucket: str, s3_key: str) -> bool:
"""Upload a local file to S3 with error handling."""
try:
s3_client.upload_file(
Filename=local_path,
Bucket=bucket,
Key=s3_key,
ExtraArgs={
"ContentType": "application/octet-stream",
"ServerSideEncryption": "AES256",
},
)
logger.info(f"Uploaded {local_path} to s3://{bucket}/{s3_key}")
return True
except ClientError as e:
logger.error(f"Upload failed: {e.response['Error']['Message']}")
return False
def upload_dataframe_as_parquet(df, bucket: str, s3_key: str) -> None:
"""Upload a pandas DataFrame directly to S3 as Parquet without writing to disk."""
import io
import pandas as pd
buffer = io.BytesIO()
df.to_parquet(buffer, engine="pyarrow", index=False)
buffer.seek(0)
s3_client.put_object(
Bucket=bucket,
Key=s3_key,
Body=buffer.getvalue(),
ContentType="application/octet-stream",
ServerSideEncryption="AES256",
)
logger.info(f"Uploaded DataFrame to s3://{bucket}/{s3_key}")
Use ExtraArgs to set metadata, content type, ACLs, or server-side encryption. Always enable server-side encryption for data buckets โ it is a one-line addition and most compliance frameworks require it.
Listing Objects with Pagination
The list_objects_v2 API returns a maximum of 1,000 objects per call. Buckets with millions of objects require pagination โ iterating through continuation tokens until there are no more results. The cleanest approach uses Boto3โs built-in paginator:
def list_all_objects(bucket: str, prefix: str = "") -> list[dict]:
"""List all objects in a bucket/prefix, handling pagination automatically."""
paginator = s3_client.get_paginator("list_objects_v2")
all_objects = []
page_count = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
page_count += 1
objects = page.get("Contents", [])
all_objects.extend(objects)
logger.info(f"Page {page_count}: retrieved {len(objects)} objects")
logger.info(f"Total objects found: {len(all_objects)}")
return all_objects
def list_parquet_files(bucket: str, prefix: str) -> list[str]:
"""Return a list of S3 keys for all Parquet files under a prefix."""
objects = list_all_objects(bucket, prefix)
return [obj["Key"] for obj in objects if obj["Key"].endswith(".parquet")]
# Example usage
parquet_keys = list_parquet_files(
bucket="my-data-lake",
prefix="processed/sales/year=2024/"
)
Paginators are the correct pattern for any listing operation. Never assume a bucket has fewer than 1,000 objects โ production buckets routinely have tens of millions.
Multipart Uploads for Large Files
For files larger than 100 MB, the standard upload_file call actually uses multipart upload automatically under the hood via the S3 Transfer Manager. You can tune the thresholds:
from boto3.s3.transfer import TransferConfig
def upload_large_file(local_path: str, bucket: str, s3_key: str) -> None:
"""Upload a large file with tuned multipart configuration."""
config = TransferConfig(
multipart_threshold=100 * 1024 * 1024, # 100 MB โ files above this use multipart
multipart_chunksize=50 * 1024 * 1024, # 50 MB chunks
max_concurrency=10, # parallel chunk uploads
use_threads=True,
)
s3_client.upload_file(
Filename=local_path,
Bucket=bucket,
Key=s3_key,
Config=config,
Callback=ProgressCallback(local_path),
)
class ProgressCallback:
"""Log upload progress for large files."""
def __init__(self, filename: str):
import os
self._filename = filename
self._size = os.path.getsize(filename)
self._uploaded = 0
def __call__(self, bytes_transferred: int):
self._uploaded += bytes_transferred
pct = (self._uploaded / self._size) * 100
logger.info(f"Upload progress: {pct:.1f}% ({self._uploaded}/{self._size} bytes)")
For manual multipart upload control โ useful when generating parts from a stream rather than a file on disk:
def manual_multipart_upload(bucket: str, s3_key: str, parts_generator):
"""
Perform a multipart upload using a generator that yields bytes chunks.
Each chunk must be at least 5 MB (except the last part).
"""
response = s3_client.create_multipart_upload(
Bucket=bucket,
Key=s3_key,
ServerSideEncryption="AES256",
)
upload_id = response["UploadId"]
parts = []
try:
for part_number, chunk in enumerate(parts_generator, start=1):
part_response = s3_client.upload_part(
Bucket=bucket,
Key=s3_key,
UploadId=upload_id,
PartNumber=part_number,
Body=chunk,
)
parts.append({"PartNumber": part_number, "ETag": part_response["ETag"]})
logger.info(f"Uploaded part {part_number}")
s3_client.complete_multipart_upload(
Bucket=bucket,
Key=s3_key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
logger.info(f"Multipart upload complete: s3://{bucket}/{s3_key}")
except Exception as e:
# Always abort on failure to avoid accumulating incomplete upload charges
s3_client.abort_multipart_upload(Bucket=bucket, Key=s3_key, UploadId=upload_id)
logger.error(f"Multipart upload aborted: {e}")
raise
The abort_multipart_upload call in the exception handler is critical. Incomplete multipart uploads accumulate storage charges until they are cleaned up. Always abort on failure, and add an S3 lifecycle rule to automatically expire incomplete uploads after 7 days as a safety net.
Generating Pre-Signed URLs
Pre-signed URLs allow you to grant time-limited access to a private S3 object without making the object public or sharing your AWS credentials. They are widely used for sharing data files, generating download links for report exports, and allowing external systems to upload directly to S3.
def generate_presigned_download_url(
bucket: str, s3_key: str, expiry_seconds: int = 3600
) -> str:
"""Generate a pre-signed URL for downloading an S3 object."""
try:
url = s3_client.generate_presigned_url(
ClientMethod="get_object",
Params={"Bucket": bucket, "Key": s3_key},
ExpiresIn=expiry_seconds,
)
return url
except ClientError as e:
logger.error(f"Failed to generate pre-signed URL: {e}")
raise
def generate_presigned_upload_url(
bucket: str, s3_key: str, content_type: str, expiry_seconds: int = 900
) -> dict:
"""
Generate a pre-signed POST URL for direct browser/client uploads.
Returns fields and URL that the client uses to POST the file.
"""
response = s3_client.generate_presigned_post(
Bucket=bucket,
Key=s3_key,
Fields={"Content-Type": content_type},
Conditions=[
{"Content-Type": content_type},
["content-length-range", 1, 500 * 1024 * 1024], # max 500 MB
],
ExpiresIn=expiry_seconds,
)
return response # {"url": "...", "fields": {...}}
Pre-signed URL expiry is a security consideration. Short-lived URLs (15-60 minutes) are appropriate for most use cases. Avoid URLs with multi-day expiry for sensitive data.
Copying and Moving Objects Between Buckets
S3 does not have a native โmoveโ operation โ moving is a copy followed by a delete. For objects under 5 GB, server-side copy is efficient and does not transfer data through your network:
def copy_object(
source_bucket: str,
source_key: str,
dest_bucket: str,
dest_key: str,
) -> None:
"""Copy an S3 object between buckets (or within the same bucket)."""
copy_source = {"Bucket": source_bucket, "Key": source_key}
s3_client.copy_object(
CopySource=copy_source,
Bucket=dest_bucket,
Key=dest_key,
ServerSideEncryption="AES256",
)
logger.info(f"Copied s3://{source_bucket}/{source_key} -> s3://{dest_bucket}/{dest_key}")
def move_object(
source_bucket: str,
source_key: str,
dest_bucket: str,
dest_key: str,
) -> None:
"""Move (copy + delete) an S3 object."""
copy_object(source_bucket, source_key, dest_bucket, dest_key)
s3_client.delete_object(Bucket=source_bucket, Key=source_key)
logger.info(f"Deleted source: s3://{source_bucket}/{source_key}")
def move_prefix(source_bucket: str, source_prefix: str, dest_bucket: str, dest_prefix: str) -> int:
"""Move all objects under a prefix to a new location. Returns count of objects moved."""
objects = list_all_objects(source_bucket, source_prefix)
count = 0
for obj in objects:
source_key = obj["Key"]
# Replace the source prefix with the destination prefix
relative_key = source_key[len(source_prefix):]
dest_key = dest_prefix + relative_key
move_object(source_bucket, source_key, dest_bucket, dest_key)
count += 1
logger.info(f"Moved {count} objects from {source_prefix} to {dest_prefix}")
return count
For objects larger than 5 GB, use copy_object with the multipart copy mechanism, or use s3_client.copy with a TransferConfig โ Boto3 handles the multipart logic automatically when copying large objects via the resource interface.
Putting It All Together: A Practical ETL Helper Module
Here is a minimal, reusable S3 utility module that ties these patterns together:
# s3_utils.py
import boto3
import io
import logging
from botocore.exceptions import ClientError
from boto3.s3.transfer import TransferConfig
logger = logging.getLogger(__name__)
class S3Helper:
def __init__(self, region: str = "ca-central-1"):
self.client = boto3.client("s3", region_name=region)
self.resource = boto3.resource("s3", region_name=region)
self._transfer_config = TransferConfig(
multipart_threshold=100 * 1024 * 1024,
multipart_chunksize=50 * 1024 * 1024,
max_concurrency=10,
)
def upload(self, local_path: str, bucket: str, key: str) -> None:
self.client.upload_file(local_path, bucket, key, Config=self._transfer_config)
def download(self, bucket: str, key: str, local_path: str) -> None:
self.client.download_file(bucket, key, local_path, Config=self._transfer_config)
def read_parquet(self, bucket: str, key: str):
import pandas as pd
obj = self.client.get_object(Bucket=bucket, Key=key)
return pd.read_parquet(io.BytesIO(obj["Body"].read()))
def write_parquet(self, df, bucket: str, key: str) -> None:
buffer = io.BytesIO()
df.to_parquet(buffer, engine="pyarrow", index=False)
buffer.seek(0)
self.client.put_object(Bucket=bucket, Key=key, Body=buffer.getvalue())
def list_keys(self, bucket: str, prefix: str = "") -> list[str]:
paginator = self.client.get_paginator("list_objects_v2")
keys = []
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
keys.extend(obj["Key"] for obj in page.get("Contents", []))
return keys
def exists(self, bucket: str, key: str) -> bool:
try:
self.client.head_object(Bucket=bucket, Key=key)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
raise
This kind of utility module is a practical foundation for any AWS data pipeline. For a higher-level abstraction built on top of Boto3, see our tutorial on AWS Data Wrangler โ it handles Parquet, partitioning, and Glue Catalog integration with less boilerplate.
Conclusion
Boto3โs S3 API is extensive, but the operations covered here โ upload, paginated listing, multipart upload, pre-signed URLs, and cross-bucket copy โ cover the vast majority of production data engineering use cases. The key habits to build: always handle pagination, always abort failed multipart uploads, and always specify encryption for data buckets.
If your team is building AWS data pipelines and needs help with architecture, code reviews, or production deployment, get in touch with Infra IT Consulting. We work with Canadian companies to design and deliver reliable, cost-efficient AWS data infrastructure.
Related posts
Book a free 30-minute consultation to discuss your data engineering and analytics needs.
Talk to our team โ