PySpark S3 Small Files Compaction


from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import boto3
from datetime import datetime
import logging

def create_optimized_spark_session(app_name="S3SmallFilesProcessor"):
    """
    Creates a Spark session optimized for handling many small files in S3
    """
    spark = (SparkSession.builder
        .appName(app_name)
        # Optimize for small files
        .config("spark.sql.files.maxPartitionBytes", "128MB")
        .config("spark.sql.files.openCostInBytes",   "1048576") # 1MB
        .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")

        # S3 specific configurations
        .config("spark.hadoop.fs.s3a.fast.upload",        "true")
        .config("spark.hadoop.fs.s3a.multipart.size",     "104857600") # 100MB
        .config("spark.hadoop.fs.s3a.block.size",         "33554432")  # 32MB
        .config("spark.hadoop.fs.s3a.connection.maximum", "100")

        # Enable dynamic partition pruning
        .config("spark.sql.adaptive.enabled",                    "true")
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .getOrCreate())
    
    return spark

def list_s3_files(bucket, prefix):
    """
    Lists all files in an S3 bucket with given prefix
    """
    s3_client = boto3.client('s3')
    files = []
    
    paginator = s3_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
    
    for page in pages:
        if 'Contents' in page:
            for obj in page['Contents']:
                files.append(f"s3a://{bucket}/{obj['Key']}")
    
    return files

def coalesce_small_files(spark, input_path, output_path, partition_cols=None, target_size_mb=128):
    """
    Reads small files and writes them back as larger, optimized files
    
    Args:
        spark: SparkSession object
        input_path: S3 path to input files
        output_path: S3 path for output files
        partition_cols: List of columns to partition by
        target_size_mb: Target size for output files in MB
    """
    try:
        # Read the input data
        df = spark.read.option("mergeSchema", "true").parquet(input_path)
        
        # Calculate number of partitions based on data size
        total_size_bytes = df.rdd.map(lambda x: len(str(x))).sum()
        target_size_bytes = target_size_mb * 1024 * 1024
        num_partitions = max(1, int(total_size_bytes / target_size_bytes))
        
        # Repartition and write optimized files
        if partition_cols:
            df = df.repartition(num_partitions, *partition_cols)
            df.write.mode("overwrite").partitionBy(*partition_cols).parquet(output_path)
        else:
            df = df.repartition(num_partitions)
            df.write.mode("overwrite").parquet(output_path)
            
        return True
        
    except Exception as e:
        logging.error(f"Error processing files: {str(e)}")
        return False

def process_small_files(bucket, input_prefix, output_prefix, partition_cols=None):
    """
    Main function to process small files in S3
    """
    # Initialize Spark with optimized settings
    spark = create_optimized_spark_session()
    
    # Process timestamp for output path
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_path = f"s3a://{bucket}/{output_prefix}/processed_{timestamp}"
    
    # List input files
    input_files = list_s3_files(bucket, input_prefix)
    if not input_files:
        logging.warning(f"No files found in s3://{bucket}/{input_prefix}")
        return
    
    logging.info(f"Found {len(input_files)} files to process")
    
    # Process the files
    success = coalesce_small_files(
        spark=spark,
        input_path=f"s3a://{bucket}/{input_prefix}",
        output_path=output_path,
        partition_cols=partition_cols
    )
    
    if success:
        logging.info(f"Successfully processed files. Output written to: {output_path}")
    else:
        logging.error("Failed to process files")
    
    spark.stop()

if __name__ == "__main__":
    # Configuration
    BUCKET         = "bucket-name"
    INPUT_PREFIX   = "raw/data"
    OUTPUT_PREFIX  = "processed/data"
    PARTITION_COLS = ["year", "month", "day"]
    
    # Set up logging
    logging.basicConfig(level=logging.INFO)
    
    # Process the files
    process_small_files(
        bucket=BUCKET,
        input_prefix=INPUT_PREFIX,
        output_prefix=OUTPUT_PREFIX,
        partition_cols=PARTITION_COLS
    )