Medallion Architecture with Partitioning



spark = SparkSession.builder \
  .appName("Medallion Architecture") \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
  .config("spark.executor.instances", "256") \
  .config("spark.sql.shuffle.partitions", "1024") \
  .getOrCreate()

# Paths for the Bronze, Silver, and Gold layers
bronze_path = "/mnt/delta/bronze"
silver_path = "/mnt/delta/silver"
gold_path   = "/mnt/delta/gold"

# ----------- Bronze Layer (Raw Data Ingestion) ----------------------------- #
raw_data = spark.read().format("json").load("/mnt/raw_data/")
raw_data = raw_data.repartition(1024)
raw_data.write().format("delta").mode("overwrite").save(bronze_path)


# ----------- Silver Layer (Data Cleaning and Transformation) --------------- #
bronze_df = spark.read().format("delta").load(bronze_path)

# Filter data for today and the last 365 days
silver_df = bronze_df.filter(
    (col("date") >= date_sub(current_date(), 365)) & (col("date") <= current_date())
)

silver_df = silver_df.dropDuplicates().filter(col("status").isNotNull())
silver_df = silver_df.repartition(365, col("date"))
silver_df.write().format("delta").mode("overwrite").save(silver_path)


# ----------- Gold Layer (Aggregated and Optimized Data) ------------------ #
silver_df = spark.read().format("delta").load(silver_path)

# Perform aggregations for reporting.
gold_df   = silver_df.groupBy("category") 
                     .agg( 
                          count("order_id").alias("total_orders"), 
                          sum("sales_amount").alias("total_sales") 
                         ) 
gold_df = gold_df.repartition(100, col("category"), col("date"))
gold_df.write().format("delta").mode("overwrite").save(gold_path)

spark.stop()
    

PySpark Aggregation Tables


Data Description

Dataset containing three columns: category, order_id, and sales_amount. The dataset is grouped by the category column, and we perform aggregation to count the total number of orders and sum up the sales amounts for each category.

Original Dataset

category order_id sales_amount
Grocery 1 100
Electronics 2 200
Grocery 3 150
Electronics 4 300
Grocery 5 100

Aggregated Result

After performing the aggregation, the dataset is grouped by category, and we compute two new columns: total_orders and total_sales.

category total_orders total_sales
Grocery 3 350
Electronics 2 500