spark = SparkSession.builder \
.appName("Medallion Architecture") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.executor.instances", "256") \
.config("spark.sql.shuffle.partitions", "1024") \
.getOrCreate()
# Paths for the Bronze, Silver, and Gold layers
bronze_path = "/mnt/delta/bronze"
silver_path = "/mnt/delta/silver"
gold_path = "/mnt/delta/gold"
# ----------- Bronze Layer (Raw Data Ingestion) ----------------------------- #
raw_data = spark.read().format("json").load("/mnt/raw_data/")
raw_data = raw_data.repartition(1024)
raw_data.write().format("delta").mode("overwrite").save(bronze_path)
# ----------- Silver Layer (Data Cleaning and Transformation) --------------- #
bronze_df = spark.read().format("delta").load(bronze_path)
# Filter data for today and the last 365 days
silver_df = bronze_df.filter(
(col("date") >= date_sub(current_date(), 365)) & (col("date") <= current_date())
)
silver_df = silver_df.dropDuplicates().filter(col("status").isNotNull())
silver_df = silver_df.repartition(365, col("date"))
silver_df.write().format("delta").mode("overwrite").save(silver_path)
# ----------- Gold Layer (Aggregated and Optimized Data) ------------------ #
silver_df = spark.read().format("delta").load(silver_path)
# Perform aggregations for reporting.
gold_df = silver_df.groupBy("category")
.agg(
count("order_id").alias("total_orders"),
sum("sales_amount").alias("total_sales")
)
gold_df = gold_df.repartition(100, col("category"), col("date"))
gold_df.write().format("delta").mode("overwrite").save(gold_path)
spark.stop()
Dataset containing three columns: category, order_id, and sales_amount. The dataset is grouped by the category column, and we perform aggregation to count the total number of orders and sum up the sales amounts for each category.
category | order_id | sales_amount |
---|---|---|
Grocery | 1 | 100 |
Electronics | 2 | 200 |
Grocery | 3 | 150 |
Electronics | 4 | 300 |
Grocery | 5 | 100 |
After performing the aggregation, the dataset is grouped by category, and we compute two new columns: total_orders and total_sales.
category | total_orders | total_sales |
---|---|---|
Grocery | 3 | 350 |
Electronics | 2 | 500 |