PySpark S3 Small Files Compaction
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import boto3
from datetime import datetime
import logging
def create_optimized_spark_session(app_name="S3SmallFilesProcessor"):
"""
Creates a Spark session optimized for handling many small files in S3
"""
spark = (SparkSession.builder
.appName(app_name)
# Optimize for small files
.config("spark.sql.files.maxPartitionBytes", "128MB")
.config("spark.sql.files.openCostInBytes", "1048576") # 1MB
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
# S3 specific configurations
.config("spark.hadoop.fs.s3a.fast.upload", "true")
.config("spark.hadoop.fs.s3a.multipart.size", "104857600") # 100MB
.config("spark.hadoop.fs.s3a.block.size", "33554432") # 32MB
.config("spark.hadoop.fs.s3a.connection.maximum", "100")
# Enable dynamic partition pruning
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.getOrCreate())
return spark
def list_s3_files(bucket, prefix):
"""
Lists all files in an S3 bucket with given prefix
"""
s3_client = boto3.client('s3')
files = []
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
for page in pages:
if 'Contents' in page:
for obj in page['Contents']:
files.append(f"s3a://{bucket}/{obj['Key']}")
return files
def coalesce_small_files(spark, input_path, output_path, partition_cols=None, target_size_mb=128):
"""
Reads small files and writes them back as larger, optimized files
Args:
spark: SparkSession object
input_path: S3 path to input files
output_path: S3 path for output files
partition_cols: List of columns to partition by
target_size_mb: Target size for output files in MB
"""
try:
# Read the input data
df = spark.read.option("mergeSchema", "true").parquet(input_path)
# Calculate number of partitions based on data size
total_size_bytes = df.rdd.map(lambda x: len(str(x))).sum()
target_size_bytes = target_size_mb * 1024 * 1024
num_partitions = max(1, int(total_size_bytes / target_size_bytes))
# Repartition and write optimized files
if partition_cols:
df = df.repartition(num_partitions, *partition_cols)
df.write.mode("overwrite").partitionBy(*partition_cols).parquet(output_path)
else:
df = df.repartition(num_partitions)
df.write.mode("overwrite").parquet(output_path)
return True
except Exception as e:
logging.error(f"Error processing files: {str(e)}")
return False
def process_small_files(bucket, input_prefix, output_prefix, partition_cols=None):
"""
Main function to process small files in S3
"""
# Initialize Spark with optimized settings
spark = create_optimized_spark_session()
# Process timestamp for output path
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"s3a://{bucket}/{output_prefix}/processed_{timestamp}"
# List input files
input_files = list_s3_files(bucket, input_prefix)
if not input_files:
logging.warning(f"No files found in s3://{bucket}/{input_prefix}")
return
logging.info(f"Found {len(input_files)} files to process")
# Process the files
success = coalesce_small_files(
spark=spark,
input_path=f"s3a://{bucket}/{input_prefix}",
output_path=output_path,
partition_cols=partition_cols
)
if success:
logging.info(f"Successfully processed files. Output written to: {output_path}")
else:
logging.error("Failed to process files")
spark.stop()
if __name__ == "__main__":
# Configuration
BUCKET = "bucket-name"
INPUT_PREFIX = "raw/data"
OUTPUT_PREFIX = "processed/data"
PARTITION_COLS = ["year", "month", "day"]
# Set up logging
logging.basicConfig(level=logging.INFO)
# Process the files
process_small_files(
bucket=BUCKET,
input_prefix=INPUT_PREFIX,
output_prefix=OUTPUT_PREFIX,
partition_cols=PARTITION_COLS
)