Silver Layer: The Conformed Data Foundation

1. Overview

The Silver layer is the cleansed, deduplicated, and conformed data tier of the Medallion Architecture. It sits as a critical bridge between the raw Bronze layer and the business-ready Gold layer. While Bronze contains the unmodified data dump from source systems, Silver applies enterprise data governance through schema enforcement, data quality rules, and business logic standardization.

The Silver layer embodies several key concepts:

Silver tables are typically immutable once written and versioned through Delta Lake time-travel, enabling reproducible analytics and regulatory compliance. A mature Silver layer reduces data work downstream and provides a reliable foundation for 80% of analytical queries.

2. Design Principles

Schema-on-Write Enforcement

Unlike Bronze's flexible schema, Silver enforces a strict schema at write-time using Delta Lake's schema validation. This prevents silent data corruption and catches integration issues early.


from delta.tables import DeltaTable
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define explicit Silver schema
silver_schema = StructType([
    StructField("customer_id", StringType(), False),  # NOT NULL
    StructField("customer_name", StringType(), False),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("address_line_1", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("created_at", TimestampType(), False),
    StructField("updated_at", TimestampType(), False),
    StructField("_bronze_loaded_at", TimestampType(), False),
])

# Write with schema enforcement
cleaned_df = bronze_df.select(
    F.col("id").cast("string").alias("customer_id"),
    F.col("name").alias("customer_name"),
    F.col("email"),
    F.col("phone"),
    F.col("address1").alias("address_line_1"),
    F.col("city"),
    F.col("state"),
    F.col("postal_code").alias("zip_code"),
    F.to_timestamp(F.col("created")).alias("created_at"),
    F.current_timestamp().alias("updated_at"),
    F.current_timestamp().alias("_bronze_loaded_at"),
)

# Enforce schema — will reject any column mismatch
cleaned_df.write \
    .format("delta") \
    .mode("overwrite") \
    .schema(silver_schema) \
    .option("overwriteSchema", "false") \
    .save("/mnt/data/silver/dim_customer")

Strongly Typed Columns

Every column in Silver should have an explicit, non-negotiable type. Nullable columns are marked explicitly and validated at query time.


# Define clear type constraints with null validation
silver_df = bronze_df.select(
    F.col("cust_id").cast("string").alias("customer_id"),
    F.col("order_amount").cast("decimal(18,2)").alias("order_amount"),
    F.col("order_date").cast("date").alias("order_date"),
    F.coalesce(F.col("discount_pct"), F.lit(0)).cast("decimal(5,2)").alias("discount_percentage"),
)

# Validate no nulls in business keys
assert silver_df.filter(F.col("customer_id").isNull()).count() == 0, "NULL customer_ids found"
assert silver_df.filter(F.col("order_amount").isNull()).count() == 0, "NULL order amounts found"

Business Key Identification

The business key is the natural, immutable identifier for an entity. Unlike surrogate keys, business keys are meaningful and stable across systems. They're essential for deduplication and SCD implementations.


# Examples of business keys across domains
business_keys = {
    "dim_customer": ["customer_id"],  # Single attribute
    "fact_order": ["order_id"],
    "dim_product": ["sku"],
    "fact_daily_inventory": ["warehouse_id", "product_sku", "date"],  # Composite key
    "fact_sales_transaction": ["store_id", "register_id", "transaction_timestamp"],
}

# Use business keys to identify and remove duplicates
window_spec = Window.partitionBy("customer_id").orderBy(F.desc("_bronze_loaded_at"))
dedup_df = silver_df.withColumn(
    "row_num",
    F.row_number().over(window_spec)
).filter(F.col("row_num") == 1).drop("row_num")

3. Data Cleansing

Null Handling

Null values must be handled consistently. Options include: forward-fill (coalesce with previous value), backward-fill, constant replacement, or marking records as quarantined.


from pyspark.sql import Window

# Strategy 1: Coalesce with fallback values
cleansed_df = bronze_df.select(
    F.col("customer_id"),
    F.coalesce(F.col("email"), F.lit("no-email@unknown.com")).alias("email"),
    F.coalesce(F.col("phone"), F.lit("")).alias("phone"),
    F.col("created_at"),
)

# Strategy 2: Forward-fill (use last known value)
window_spec = Window.partitionBy("customer_id").orderBy("created_at").rowsBetween(
    Window.unboundedPreceding, -1
)
filled_df = bronze_df.withColumn(
    "phone_filled",
    F.last(F.col("phone"), ignorenulls=True).over(window_spec)
)

# Strategy 3: fillna with domain-specific defaults
domain_defaults = {
    "email": "unknown@example.com",
    "phone": "000-000-0000",
    "address": "N/A",
    "discount_pct": 0.0,
}

filled_df = bronze_df.fillna(domain_defaults)

# Strategy 4: Identify records with critical nulls and quarantine
critical_nulls = bronze_df.filter(
    F.col("customer_id").isNull() |
    F.col("created_at").isNull()
)

valid_df = bronze_df.filter(
    F.col("customer_id").isNotNull() &
    F.col("created_at").isNotNull()
)

# Log quarantined records
if critical_nulls.count() > 0:
    critical_nulls.write \
        .format("delta") \
        .mode("append") \
        .save("/mnt/data/quarantine/dim_customer_nulls")

Type Casting and Validation

Cast columns to their target types with validation. Use try_cast patterns to gracefully handle type failures.


import pyspark.sql.functions as F
from pyspark.sql.types import DecimalType

# Safe type casting with validation
silver_df = bronze_df.select(
    F.col("customer_id").cast("string").alias("customer_id"),
    F.col("order_id").cast("long").alias("order_id"),
    F.col("order_amount").cast(DecimalType(18, 2)).alias("order_amount"),
    F.col("order_date").cast("date").alias("order_date"),
    F.col("created_at").cast("timestamp").alias("created_at"),
)

# Identify casting failures
def safe_cast_to_decimal(col_name, precision=18, scale=2):
    """Returns column cast to decimal with validation."""
    return F.try_cast(F.col(col_name), DecimalType(precision, scale))

result_df = bronze_df.select(
    F.col("order_id"),
    safe_cast_to_decimal("order_amount"),
    safe_cast_to_decimal("tax_amount"),
)

# Log records with casting failures
cast_failures = result_df.filter(
    F.col("order_amount").isNull() | F.col("tax_amount").isNull()
)

if cast_failures.count() > 0:
    cast_failures.write \
        .format("delta") \
        .mode("append") \
        .save("/mnt/data/quarantine/casting_failures")

String Normalization

Apply consistent transformations to text fields: trimming, case normalization, regex cleaning, and special character handling.


import re
from pyspark.sql.functions import col, trim, lower, upper, regexp_replace, when

# Normalize customer names: trim, title case
normalized_df = bronze_df.select(
    F.col("customer_id"),
    F.initcap(F.trim(F.col("customer_name"))).alias("customer_name"),
    F.lower(F.trim(F.col("email"))).alias("email"),
)

# Remove special characters and multiple spaces from addresses
normalized_df = bronze_df.select(
    F.col("customer_id"),
    F.regexp_replace(
        F.col("address"),
        r"[^\w\s\-\.,#]",  # Keep alphanumeric, space, dash, period, comma, hash
        ""
    ).alias("address_clean"),
)

# Normalize phone numbers: extract digits only
normalized_df = bronze_df.select(
    F.col("customer_id"),
    F.regexp_replace(F.col("phone"), r"[^\d]", "").alias("phone_digits_only"),
)

# Define a UDF for complex string transformations
def normalize_company_name(name):
    """Remove common suffixes and standardize."""
    if name is None:
        return None
    name = name.strip().upper()
    name = re.sub(r"\s+(INC|LLC|LTD|CORP|CO)\.?$", "", name)
    name = re.sub(r"\s+", " ", name)  # Collapse multiple spaces
    return name

normalize_udf = F.udf(normalize_company_name, StringType())

normalized_df = bronze_df.select(
    F.col("customer_id"),
    normalize_udf(F.col("company_name")).alias("company_name_normalized"),
)

Date Parsing and Standardization

Parse dates from multiple formats and standardize to UTC timestamps with timezone awareness.


from datetime import datetime
from pyspark.sql import functions as F

# Handle multiple date formats
date_patterns = [
    "yyyy-MM-dd",
    "MM/dd/yyyy",
    "dd-MMM-yyyy",
    "yyyy-MM-dd'T'HH:mm:ss",
    "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
]

# Try each format sequentially
parsed_date = F.col("date_string")
for pattern in date_patterns:
    parsed_date = F.coalesce(
        F.to_date(parsed_date, pattern),
        parsed_date
    )

date_standardized_df = bronze_df.select(
    F.col("transaction_id"),
    F.to_timestamp(
        F.col("created_at"),
        "yyyy-MM-dd'T'HH:mm:ss"
    ).alias("created_at_utc"),
)

# Convert local time to UTC with timezone handling
from_tz = "America/New_York"  # Source timezone
to_tz = "UTC"

tz_converted_df = date_standardized_df.select(
    F.col("transaction_id"),
    F.from_utc_timestamp(
        F.to_utc_timestamp(F.col("created_at_utc"), from_tz),
        to_tz
    ).alias("created_at_utc_normalized"),
)

# Extract date components for partitioning and aggregation
enriched_df = tz_converted_df.select(
    "*",
    F.to_date(F.col("created_at_utc_normalized")).alias("created_date"),
    F.year(F.col("created_at_utc_normalized")).alias("created_year"),
    F.month(F.col("created_at_utc_normalized")).alias("created_month"),
    F.dayofweek(F.col("created_at_utc_normalized")).alias("created_dayofweek"),
)

Removing Duplicates with Window Functions

Use window functions to identify and remove duplicates based on business keys, keeping the most recent or highest-priority record.


from pyspark.sql import Window
from pyspark.sql.functions import row_number, dense_rank, rank

# Simple deduplication: keep the latest record per customer_id
window_spec = Window.partitionBy("customer_id").orderBy(F.desc("_bronze_loaded_at"))

dedup_df = bronze_df.withColumn(
    "row_num",
    F.row_number().over(window_spec)
).filter(F.col("row_num") == 1).drop("row_num")

# Composite key deduplication with precedence
window_spec = Window.partitionBy("customer_id", "account_id").orderBy(
    F.desc("data_quality_score"),  # Prefer high-quality records
    F.desc("_bronze_loaded_at"),    # Then most recent
)

dedup_df = bronze_df.withColumn(
    "rn",
    F.row_number().over(window_spec)
).filter(F.col("rn") == 1).drop("rn")

# Rank duplicates and log lower-rank versions
window_spec = Window.partitionBy("customer_id").orderBy(F.desc("_bronze_loaded_at"))

ranked_df = bronze_df.withColumn(
    "dup_rank",
    F.row_number().over(window_spec)
)

# Keep top 1, quarantine the rest
valid_df = ranked_df.filter(F.col("dup_rank") == 1).drop("dup_rank")
duplicate_records = ranked_df.filter(F.col("dup_rank") > 1)

duplicate_records.write \
    .format("delta") \
    .mode("append") \
    .save("/mnt/data/quarantine/dim_customer_duplicates")

4. Deduplication Strategies

Exact Deduplication with dropDuplicates

The simplest approach: remove rows that are identical across all columns, or specific business key columns.


# Remove complete row duplicates (all columns must match)
dedup_df = bronze_df.dropDuplicates()

# Remove duplicates based on specific columns (business keys)
dedup_df = bronze_df.dropDuplicates(["customer_id", "account_id"])

# dropDuplicates keeps the first occurrence (non-deterministic in distributed context)
# For deterministic behavior, prefer window functions

Fuzzy Deduplication

Handle near-duplicates using fuzzy matching on strings and allowing small numeric differences.


from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, CountVectorizer
from difflib import SequenceMatcher

# Simple fuzzy match using string similarity
def string_similarity(s1, s2):
    """Return similarity ratio between 0 and 1."""
    if s1 is None or s2 is None:
        return 0.0
    return SequenceMatcher(None, s1, s2).ratio()

string_sim_udf = F.udf(string_similarity, "double")

# Find near-duplicate names
customer_df = bronze_df.select("customer_id", "customer_name")

# Self-join to find similar names
similar_df = customer_df.alias("a").join(
    customer_df.alias("b"),
    F.col("a.customer_id") < F.col("b.customer_id")
).select(
    F.col("a.customer_id").alias("customer_id_a"),
    F.col("a.customer_name").alias("name_a"),
    F.col("b.customer_id").alias("customer_id_b"),
    F.col("b.customer_name").alias("name_b"),
    string_sim_udf(
        F.lower(F.col("a.customer_name")),
        F.lower(F.col("b.customer_name"))
    ).alias("similarity_score"),
)

# Filter for high-similarity pairs (>90%)
high_similarity = similar_df.filter(F.col("similarity_score") > 0.9)

# Flag and quarantine fuzzy duplicates
high_similarity.write \
    .format("delta") \
    .mode("append") \
    .save("/mnt/data/quarantine/fuzzy_duplicates")

# Apply Levenshtein distance for more robust fuzzy matching
# Requires: pip install pyspark-distance
# from pyspark.mllib.common import callMLlibFunc

# Alternative: Use Metaphone or Soundex for phonetic matching
def soundex(name):
    """Generate Soundex code for fuzzy phonetic matching."""
    if not name:
        return None
    name = name.upper()
    first_letter = name[0]
    name = name[1:]

    mapping = {
        'B': '1', 'F': '1', 'P': '1', 'V': '1',
        'C': '2', 'G': '2', 'J': '2', 'K': '2', 'Q': '2', 'S': '2', 'X': '2', 'Z': '2',
        'D': '3', 'T': '3',
        'L': '4',
        'M': '5', 'N': '5',
        'R': '6',
    }

    code = first_letter
    prev_code = mapping.get(first_letter, '0')

    for char in name:
        digit = mapping.get(char, '0')
        if digit != '0' and digit != prev_code:
            code += digit
        if len(code) == 4:
            break
        prev_code = digit

    return code.ljust(4, '0')

soundex_udf = F.udf(soundex, "string")

phonetic_df = bronze_df.select(
    F.col("customer_id"),
    F.col("customer_name"),
    soundex_udf(F.col("customer_name")).alias("name_soundex"),
)

MERGE INTO for Incremental Deduplication

Use Delta Lake MERGE for idempotent, incremental updates with deduplication logic.


from delta.tables import DeltaTable

# Create or reference Silver table
silver_path = "/mnt/data/silver/dim_customer"
silver_table = DeltaTable.forPath(spark, silver_path)

# New records from Bronze (deduplicated)
new_records = bronze_df.dropDuplicates(["customer_id"])

# MERGE: update if exists, insert if new
silver_table.alias("target") \
    .merge(
        new_records.alias("source"),
        "target.customer_id = source.customer_id"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

# With explicit deduplication priority (keep most recent)
from pyspark.sql import Window

window_spec = Window.partitionBy("customer_id").orderBy(F.desc("_bronze_loaded_at"))
dedup_new_records = new_records.withColumn(
    "rn", F.row_number().over(window_spec)
).filter(F.col("rn") == 1).drop("rn")

silver_table.alias("target") \
    .merge(
        dedup_new_records.alias("source"),
        "target.customer_id = source.customer_id"
    ) \
    .whenMatchedUpdate(
        set={
            "customer_name": "source.customer_name",
            "email": "source.email",
            "updated_at": F.current_timestamp(),
        }
    ) \
    .whenNotMatchedInsert(
        values={
            "customer_id": "source.customer_id",
            "customer_name": "source.customer_name",
            "email": "source.email",
            "created_at": F.current_timestamp(),
            "updated_at": F.current_timestamp(),
        }
    ) \
    .execute()

Watermark-Based Deduplication in Streaming

For streaming Silver tables, use watermarks to allow late-arriving updates while preventing unbounded state.


from pyspark.sql import functions as F

# Streaming Silver with watermark and deduplication
streaming_silver_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .load("/mnt/data/bronze/customer_events") \
    .select("customer_id", "event_timestamp", "email", "phone")

# Add watermark to allow late-arriving updates (up to 1 hour late)
watermarked_df = streaming_silver_df.withWatermark("event_timestamp", "1 hour")

# Deduplication with watermark: keep latest within grace period
dedup_streaming = watermarked_df.dropDuplicates(
    ["customer_id"],
    within_watermark=True  # Only deduplicate within watermark window
)

# For more control, use foreachBatch with state
def upsert_to_silver(batch_df, batch_id):
    """Called for each micro-batch."""
    from delta.tables import DeltaTable

    silver_table = DeltaTable.forPath(spark, "/mnt/data/silver/dim_customer")

    # Deduplicate within batch
    window_spec = Window.partitionBy("customer_id").orderBy(F.desc("event_timestamp"))
    batch_dedup = batch_df.withColumn(
        "rn", F.row_number().over(window_spec)
    ).filter(F.col("rn") == 1).drop("rn")

    # Merge into Silver
    silver_table.alias("target") \
        .merge(
            batch_dedup.alias("source"),
            "target.customer_id = source.customer_id"
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

streaming_silver_df.writeStream \
    .foreachBatch(upsert_to_silver) \
    .option("checkpointLocation", "/mnt/data/.checkpoints/silver_customer") \
    .start()

5. Schema Enforcement & Evolution

Delta Lake Schema Enforcement on Write

Delta Lake validates incoming data against the stored schema. By default, extra columns are rejected and missing columns cause errors.


from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Define strict schema
silver_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("customer_name", StringType(), False),
    StructField("email", StringType(), True),
    StructField("created_at", TimestampType(), False),
])

# Write with strict schema enforcement (default)
df.write \
    .format("delta") \
    .mode("overwrite") \
    .schema(silver_schema) \
    .option("overwriteSchema", "false") \
    .save("/mnt/data/silver/dim_customer")

# Attempt to write with extra column (will fail)
df_with_extra = df.withColumn("extra_col", F.lit("value"))

try:
    df_with_extra.write \
        .format("delta") \
        .mode("append") \
        .save("/mnt/data/silver/dim_customer")
except Exception as e:
    print(f"Schema mismatch: {e}")
    # Schema violation caught at write-time

mergeSchema vs overwriteSchema

Two strategies for schema changes: mergeSchema adds columns, overwriteSchema replaces entirely.


# mergeSchema: Add new columns, keep existing ones
df_with_new_col = bronze_df.withColumn("phone_normalized", F.regexp_replace(F.col("phone"), r"[^\d]", ""))

df_with_new_col.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/mnt/data/silver/dim_customer")

# overwriteSchema: Replace entire schema (destructive)
df_redesigned = bronze_df.select("customer_id", "customer_name", "created_at", "updated_at")

df_redesigned.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/data/silver/dim_customer")

# Preferred: Schema evolution via column renaming and type changes
from pyspark.sql.functions import col, cast

# Rename and retype a column
evolved_df = bronze_df \
    .drop("old_email") \
    .withColumn("email_primary", col("email_address").cast("string"))

evolved_df.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/mnt/data/silver/dim_customer")

Column Mapping and Backward Compatibility

Column mapping allows renaming without schema breaking changes. Track both physical and logical names.


# Enable column mapping for safe refactoring
spark.sql("""
    ALTER TABLE delta.`/mnt/data/silver/dim_customer`
    SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')
""")

# Now safely rename columns
spark.sql("""
    ALTER TABLE delta.`/mnt/data/silver/dim_customer`
    RENAME COLUMN old_customer_id TO customer_id
""")

# Drop columns safely (mark as deprecated first)
spark.sql("""
    ALTER TABLE delta.`/mnt/data/silver/dim_customer`
    SET TBLPROPERTIES (
        'delta.deprecatedColumns' = 'legacy_phone_type,legacy_address_2'
    )
""")

# Type upgrades (backward-compatible)
spark.sql("""
    ALTER TABLE delta.`/mnt/data/silver/dim_customer`
    MODIFY COLUMN discount_pct DECIMAL(5,3)  -- Increase precision
""")

# Breaking changes: use OVERWRITE with migration window
# 1. Create new table with new schema
spark.sql("""
    CREATE TABLE silver.dim_customer_v2
    USING DELTA
    AS
    SELECT
        customer_id,
        customer_name,
        CAST(discount_pct AS DECIMAL(5,3)) AS discount_pct,
        created_at
    FROM silver.dim_customer
""")

# 2. Run shadow queries to validate
# 3. Switch over (update downstream queries)
# 4. Drop old table after cutover window
spark.sql("DROP TABLE silver.dim_customer")
spark.sql("ALTER TABLE silver.dim_customer_v2 RENAME TO dim_customer")

6. SCD Type 2 Implementation

Full PySpark MERGE INTO for SCD Type 2

SCD Type 2 tracks the full history of dimensional changes with effective/end dates and current flags.


from delta.tables import DeltaTable
from pyspark.sql import functions as F
from datetime import datetime

# Create SCD Type 2 Silver dimension with history
silver_table_path = "/mnt/data/silver/dim_customer_scd2"

# Initialize table with schema
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.dim_customer_scd2 (
        customer_id STRING NOT NULL,
        customer_name STRING,
        email STRING,
        phone STRING,
        address STRING,
        effective_date DATE NOT NULL,
        end_date DATE,
        is_current BOOLEAN DEFAULT true,
        _scd_version INT,
        _inserted_at TIMESTAMP,
        _updated_at TIMESTAMP
    )
    USING DELTA
""")

# New records from Bronze
new_records = spark.sql("""
    SELECT DISTINCT
        customer_id,
        customer_name,
        email,
        phone,
        address,
        CURRENT_DATE() as effective_date
    FROM bronze.dim_customer
    WHERE customer_id IS NOT NULL
""")

# SCD Type 2 MERGE logic
silver_table = DeltaTable.forPath(spark, silver_table_path)

silver_table.alias("target") \
    .merge(
        new_records.alias("source"),
        """
        target.customer_id = source.customer_id
        AND target.is_current = true
        """
    ) \
    .whenMatchedUpdate(
        condition="""
            (target.customer_name != source.customer_name
             OR target.email != source.email
             OR target.phone != source.phone
             OR target.address != source.address)
            OR (target.customer_name IS NULL AND source.customer_name IS NOT NULL)
        """,
        set={
            "end_date": F.current_date() - F.expr("INTERVAL 1 DAY"),
            "is_current": F.lit(False),
            "_updated_at": F.current_timestamp(),
        }
    ) \
    .whenNotMatchedInsert(
        values={
            "customer_id": "source.customer_id",
            "customer_name": "source.customer_name",
            "email": "source.email",
            "phone": "source.phone",
            "address": "source.address",
            "effective_date": "source.effective_date",
            "end_date": F.lit(None),
            "is_current": F.lit(True),
            "_scd_version": F.lit(1),
            "_inserted_at": F.current_timestamp(),
            "_updated_at": F.current_timestamp(),
        }
    ) \
    .execute()

# Insert new versions for changed records
changes_df = silver_table.alias("target") \
    .merge(
        new_records.alias("source"),
        "target.customer_id = source.customer_id"
    ) \
    .toDF()

# Capture what changed and insert new versions
changed_records = spark.sql("""
    SELECT
        source.customer_id,
        source.customer_name,
        source.email,
        source.phone,
        source.address,
        source.effective_date,
        NULL as end_date,
        true as is_current,
        MAX(target._scd_version) + 1 as _scd_version,
        CURRENT_TIMESTAMP() as _inserted_at,
        CURRENT_TIMESTAMP() as _updated_at
    FROM new_records source
    JOIN (SELECT DISTINCT * FROM silver.dim_customer_scd2) target
        ON source.customer_id = target.customer_id
        AND target.is_current = true
    WHERE (
        target.customer_name != source.customer_name
        OR target.email != source.email
        OR target.phone != source.phone
        OR target.address != source.address
    )
    GROUP BY source.customer_id, source.customer_name, source.email,
             source.phone, source.address, source.effective_date
""")

changed_records.write \
    .format("delta") \
    .mode("append") \
    .save(silver_table_path)

SQL-based SCD Type 2

Pure SQL approach for SCD Type 2 using WITH statements and CTEs.


-- SCD Type 2: Mark old records as expired and insert new ones
WITH new_data AS (
    SELECT DISTINCT
        customer_id,
        customer_name,
        email,
        phone,
        address,
        CURRENT_DATE() AS effective_date
    FROM bronze.dim_customer
),
expiring_records AS (
    SELECT
        target.customer_key,
        target.customer_id,
        CURRENT_DATE() - INTERVAL 1 DAY AS end_date
    FROM silver.dim_customer_scd2 target
    INNER JOIN new_data source
        ON target.customer_id = source.customer_id
    WHERE target.is_current = true
        AND (
            target.customer_name != source.customer_name
            OR target.email != source.email
            OR target.phone != source.phone
            OR target.address != source.address
        )
),
new_records AS (
    SELECT
        source.customer_id,
        source.customer_name,
        source.email,
        source.phone,
        source.address,
        source.effective_date,
        NULL AS end_date,
        true AS is_current,
        COALESCE(MAX(target._scd_version), 0) + 1 AS _scd_version,
        CURRENT_TIMESTAMP() AS _inserted_at,
        CURRENT_TIMESTAMP() AS _updated_at
    FROM new_data source
    LEFT JOIN silver.dim_customer_scd2 target
        ON source.customer_id = target.customer_id
    GROUP BY source.customer_id, source.customer_name, source.email,
             source.phone, source.address, source.effective_date
)
MERGE INTO silver.dim_customer_scd2 target
USING (
    SELECT customer_key, customer_id, end_date FROM expiring_records
) source
ON target.customer_key = source.customer_key
WHEN MATCHED THEN
    UPDATE SET
        end_date = source.end_date,
        is_current = false,
        _updated_at = CURRENT_TIMESTAMP()
;

-- Insert new versions
INSERT INTO silver.dim_customer_scd2
SELECT
    customer_id,
    customer_name,
    email,
    phone,
    address,
    effective_date,
    end_date,
    is_current,
    _scd_version,
    _inserted_at,
    _updated_at
FROM new_records;

Late-Arriving Updates in SCD Type 2

Handle updates that arrive out of order with effective date in the past, requiring version reordering.


from pyspark.sql import functions as F
from pyspark.sql import Window

# Scenario: Receive update with effective_date in the past
late_update = spark.createDataFrame([
    ("CUST-001", "John Smith", "john.smith@example.com", "2026-02-15")
], ["customer_id", "customer_name", "email", "effective_date"])

# Find if this update falls between existing versions
late_update_scd = late_update.alias("new") \
    .join(
        spark.table("silver.dim_customer_scd2").alias("target"),
        (F.col("new.customer_id") == F.col("target.customer_id"))
        & (F.col("new.effective_date").between(
            F.col("target.effective_date"),
            F.coalesce(F.col("target.end_date"), F.current_date())
        )),
        "left"
    )

# Case 1: Update falls within existing version (replace it)
# Case 2: Update is newer (standard SCD Type 2)
# Case 3: Update is between versions (splice history)

# Splice approach: reorder versions with new effective dates
def handle_late_update(customer_id, new_effective_date, new_data):
    """Rewrite SCD Type 2 history with late update."""
    current_history = spark.sql(f"""
        SELECT * FROM silver.dim_customer_scd2
        WHERE customer_id = '{customer_id}'
        ORDER BY effective_date
    """)

    # Find insertion point
    history_list = current_history.collect()
    insert_idx = next(
        (i for i, row in enumerate(history_list)
         if F.col(row.effective_date) > F.col(new_effective_date)),
        len(history_list)
    )

    # Rebuild with new record inserted
    # Close any open-ended version that now has a successor
    if insert_idx < len(history_list):
        history_list[insert_idx].end_date = new_effective_date - F.expr("INTERVAL 1 DAY")

    return history_list

7. Data Quality Checks

Great Expectations Integration

Use Great Expectations for comprehensive data validation with metrics logging and failure handling.


from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
import great_expectations as ge

# Load and validate Silver data
silver_df = spark.read.format("delta").load("/mnt/data/silver/dim_customer")
silver_ge = SparkDFDataset(silver_df)

# Define expectations
expectations = [
    # Column presence
    ("expect_table_columns_to_match_ordered_list", {
        "column_list": ["customer_id", "customer_name", "email", "created_at"]
    }),

    # Non-null expectations
    ("expect_column_values_to_not_be_null", {
        "column": "customer_id"
    }),
    ("expect_column_values_to_not_be_null", {
        "column": "customer_name"
    }),

    # Value range expectations
    ("expect_column_values_to_be_between", {
        "column": "discount_pct",
        "min_value": 0.0,
        "max_value": 1.0
    }),

    # Pattern matching
    ("expect_column_values_to_match_regex", {
        "column": "email",
        "regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    }),

    # Uniqueness
    ("expect_column_values_to_be_unique", {
        "column": "customer_id"
    }),

    # Custom metric
    ("expect_table_row_count_to_be_between", {
        "min_value": 1000,
        "max_value": 10000000
    }),
]

# Run expectations
results = []
for expectation_name, kwargs in expectations:
    try:
        result = getattr(silver_ge, expectation_name)(**kwargs)
        results.append({
            "expectation": expectation_name,
            "passed": result.success,
            "result": result.result if hasattr(result, "result") else None,
        })
    except Exception as e:
        results.append({
            "expectation": expectation_name,
            "passed": False,
            "error": str(e),
        })

# Log results
results_df = spark.createDataFrame(results)
results_df.write \
    .format("delta") \
    .mode("append") \
    .save("/mnt/data/silver/_quality_checks")

# Fail pipeline if critical expectations fail
critical_failures = [r for r in results if not r["passed"] and r["expectation"] in [
    "expect_column_values_to_not_be_null",
    "expect_column_values_to_be_unique",
]]

if critical_failures:
    raise Exception(f"Data quality validation failed: {critical_failures}")

DLT Expectations Framework

Use Databricks Lakehouse IQ DLT expectations for declarative quality rules.


import dlt
from pyspark.sql import functions as F

# Enable DLT expectations
dlt.create_table(
    name="dim_customer",
    comment="Customer dimension with SCD Type 2",
    path="/mnt/data/silver/dim_customer",
)

@dlt.table
@dlt.expect("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect("customer_id_length", "LENGTH(customer_id) <= 50")
@dlt.expect("email_format", "email RLIKE '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$' OR email IS NULL")
@dlt.expect_or_drop("positive_discount", "discount_pct >= 0 AND discount_pct <= 1")
@dlt.expect_or_fail("created_date_valid", "created_at <= CURRENT_TIMESTAMP()")
def dim_customer():
    """Cleansed customer dimension."""
    return (
        spark.read.format("delta").load("/mnt/data/bronze/customer")
        .select(
            F.col("id").cast("string").alias("customer_id"),
            F.initcap(F.trim(F.col("name"))).alias("customer_name"),
            F.lower(F.trim(F.col("email"))).alias("email"),
            F.col("discount").cast("decimal(5,2)").alias("discount_pct"),
            F.col("created_at").cast("timestamp").alias("created_at"),
        )
    )

# Expectations can also be defined in a separate quality dataset
@dlt.quality_metrics("dim_customer")
def quality_checks():
    return spark.sql("""
        SELECT
            COUNT(*) as total_records,
            COUNT(DISTINCT customer_id) as unique_customers,
            SUM(CASE WHEN email IS NULL THEN 1 ELSE 0 END) as null_emails,
            MIN(created_at) as earliest_created,
            MAX(created_at) as latest_created
        FROM silver.dim_customer
    """)

Custom Quality Check Framework

Build a reusable quality check framework with metrics logging and quarantine routing.


from pyspark.sql import functions as F
from datetime import datetime
import json

class SilverQualityValidator:
    """Reusable Silver layer quality validation framework."""

    def __init__(self, spark_session, table_name, table_path):
        self.spark = spark_session
        self.table_name = table_name
        self.table_path = table_path
        self.checks_run = []
        self.quarantine_records = None

    def check_null_columns(self, column_list, severity="ERROR"):
        """Validate specified columns are not null."""
        df = self.spark.read.format("delta").load(self.table_path)

        for col in column_list:
            null_count = df.filter(F.col(col).isNull()).count()
            total_count = df.count()
            null_pct = (null_count / total_count * 100) if total_count > 0 else 0

            passed = null_count == 0
            self.checks_run.append({
                "check_name": f"null_check_{col}",
                "table_name": self.table_name,
                "severity": severity,
                "passed": passed,
                "null_count": null_count,
                "null_percentage": null_pct,
                "check_timestamp": datetime.utcnow().isoformat(),
            })

            if not passed and severity == "ERROR":
                quarantine_df = df.filter(F.col(col).isNull())
                self._quarantine_records(quarantine_df, f"null_{col}")

    def check_value_range(self, column, min_val, max_val, severity="ERROR"):
        """Validate column values are within range."""
        df = self.spark.read.format("delta").load(self.table_path)

        out_of_range = df.filter(
            (F.col(column) < min_val) | (F.col(column) > max_val)
        )

        out_of_range_count = out_of_range.count()
        total_count = df.count()
        out_of_range_pct = (out_of_range_count / total_count * 100) if total_count > 0 else 0

        passed = out_of_range_count == 0
        self.checks_run.append({
            "check_name": f"range_check_{column}",
            "table_name": self.table_name,
            "severity": severity,
            "passed": passed,
            "out_of_range_count": out_of_range_count,
            "out_of_range_percentage": out_of_range_pct,
            "expected_range": f"[{min_val}, {max_val}]",
            "check_timestamp": datetime.utcnow().isoformat(),
        })

        if not passed and severity == "ERROR":
            self._quarantine_records(out_of_range, f"range_violation_{column}")

    def check_uniqueness(self, column_list, severity="ERROR"):
        """Validate uniqueness of column(s)."""
        df = self.spark.read.format("delta").load(self.table_path)

        total_count = df.count()
        unique_count = df.select(*column_list).distinct().count()
        duplicates = total_count - unique_count

        passed = duplicates == 0
        self.checks_run.append({
            "check_name": f"uniqueness_{','.join(column_list)}",
            "table_name": self.table_name,
            "severity": severity,
            "passed": passed,
            "duplicate_count": duplicates,
            "duplicate_percentage": (duplicates / total_count * 100) if total_count > 0 else 0,
            "check_timestamp": datetime.utcnow().isoformat(),
        })

        if not passed and severity == "ERROR":
            window_spec = Window.partitionBy(*column_list).orderBy(F.desc("_bronze_loaded_at"))
            duplicate_df = df.withColumn(
                "rn", F.row_number().over(window_spec)
            ).filter(F.col("rn") > 1).drop("rn")
            self._quarantine_records(duplicate_df, f"duplicates_{','.join(column_list)}")

    def _quarantine_records(self, df, reason):
        """Write failing records to quarantine."""
        df.select("*", F.lit(reason).alias("quarantine_reason")) \
            .write \
            .format("delta") \
            .mode("append") \
            .save(f"/mnt/data/quarantine/{self.table_name}_{reason}")

    def report(self):
        """Return quality check report."""
        report_df = self.spark.createDataFrame(self.checks_run)

        report_df.write \
            .format("delta") \
            .mode("append") \
            .save(f"/mnt/data/silver/_quality_reports/{self.table_name}")

        failed_checks = [c for c in self.checks_run if not c["passed"]]
        if failed_checks:
            print(f"WARNING: {len(failed_checks)} quality checks failed")
            for check in failed_checks:
                print(f"  - {check['check_name']}: {check}")

        return report_df

# Usage
validator = SilverQualityValidator(
    spark,
    "dim_customer",
    "/mnt/data/silver/dim_customer"
)

validator.check_null_columns(["customer_id", "customer_name"])
validator.check_value_range("discount_pct", 0.0, 1.0)
validator.check_uniqueness(["customer_id"])
validator.report()

8. Joins & Enrichment

Star Schema Dimension Joining

Join fact tables with dimensions at Silver layer for analytical readiness.


from pyspark.sql import functions as F

# Star schema: fact_order + dimensions
fact_order = spark.read.format("delta").load("/mnt/data/silver/fact_order")
dim_customer = spark.read.format("delta").load("/mnt/data/silver/dim_customer")
dim_product = spark.read.format("delta").load("/mnt/data/silver/dim_product")
dim_date = spark.read.format("delta").load("/mnt/data/silver/dim_date")

# Multi-dimensional join
enriched_orders = (
    fact_order
    .join(
        dim_customer.filter(F.col("is_current") == True),  # SCD Type 2: only current
        on="customer_id",
        how="left"
    )
    .join(
        dim_product.filter(F.col("is_current") == True),
        on="product_id",
        how="left"
    )
    .join(
        dim_date,
        on=F.col("fact_order.order_date") == F.col("dim_date.date_key"),
        how="left"
    )
    .select(
        "fact_order.*",
        F.col("dim_customer.customer_name"),
        F.col("dim_customer.email"),
        F.col("dim_product.product_name"),
        F.col("dim_product.category"),
        F.col("dim_product.price"),
        F.col("dim_date.fiscal_year"),
        F.col("dim_date.fiscal_quarter"),
    )
)

enriched_orders.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("order_date") \
    .save("/mnt/data/silver/fact_order_enriched")

Broadcast Joins for Small Dimensions

Use broadcast hints to optimize joins when dimension tables are small enough to fit in memory.


from pyspark.sql import functions as F

# Broadcast small dimension tables
dim_country = spark.read.format("delta").load("/mnt/data/silver/dim_country")  # ~300 rows
dim_category = spark.read.format("delta").load("/mnt/data/silver/dim_category")  # ~100 rows

fact_order = spark.read.format("delta").load("/mnt/data/silver/fact_order")

# Explicit broadcast hint
enriched = fact_order.join(
    F.broadcast(dim_country),
    on="country_id",
    how="left"
).join(
    F.broadcast(dim_category),
    on="product_category_id",
    how="left"
)

# Or set Spark configuration for auto-broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50mb")

# Check what got broadcast
enriched.explain()  # Shows broadcast in the physical plan

Handling Skewed Joins with Salting

When one value dominates a join column (data skew), add salt keys to distribute processing.


from pyspark.sql import functions as F
from pyspark.sql import Window
import random

# Identify skew: count distribution
skew_analysis = fact_order.groupBy("product_id").count().orderBy(F.desc("count")).show()

# If product_id=123 has 40% of rows, salt it
salt_factor = 10  # distribute skewed value across 10 partitions

salted_fact = fact_order.withColumn(
    "salt_key",
    F.when(
        F.col("product_id") == "123",
        F.expr(f"CAST(RAND() * {salt_factor} AS INT)")
    ).otherwise(F.lit(0))
)

salted_dim = dim_product.withColumn(
    "salt_key",
    F.explode(F.array(*[F.lit(i) for i in range(salt_factor)]))
)

# Join on (product_id, salt_key)
enriched = salted_fact.join(
    salted_dim,
    on=["product_id", "salt_key"],
    how="left"
).drop("salt_key")

enriched.write.format("delta").mode("overwrite").save("/mnt/data/silver/fact_order_enriched")

Slowly Changing Dimension Lookups

Join facts with SCD Type 2 dimensions to get the correct version at the fact's transaction date.


from pyspark.sql import functions as F

# Fact table with order_date
fact_order = spark.read.format("delta").load("/mnt/data/silver/fact_order")

# SCD Type 2 dimension
dim_customer_scd2 = spark.read.format("delta").load("/mnt/data/silver/dim_customer_scd2")

# Join to get the correct customer version at order time
correct_version = (
    fact_order
    .join(
        dim_customer_scd2,
        on=[
            F.col("fact_order.customer_id") == F.col("dim_customer_scd2.customer_id"),
            F.col("fact_order.order_date").between(
                F.col("dim_customer_scd2.effective_date"),
                F.coalesce(F.col("dim_customer_scd2.end_date"), F.current_date())
            )
        ],
        how="left"
    )
    .select(
        "fact_order.*",
        F.col("dim_customer_scd2.customer_name"),
        F.col("dim_customer_scd2._scd_version"),
    )
)

# Alternative: lookup current version (is_current = true)
current_version = (
    fact_order
    .join(
        dim_customer_scd2.filter(F.col("is_current") == True),
        on="customer_id",
        how="left"
    )
    .select(
        "fact_order.*",
        F.col("dim_customer_scd2.customer_name"),
    )
)

9. Incremental Processing Patterns

Change Data Feed (CDF) from Bronze to Silver

Use Change Data Feed to capture only modified records, enabling efficient incremental processing.


from pyspark.sql import functions as F

# Enable CDF on Bronze table (if not already enabled)
spark.sql("""
    ALTER TABLE delta.`/mnt/data/bronze/customer`
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read changes since last run
cdf_df = spark.read \
    .format("delta") \
    .option("readChangeData", "true") \
    .option("startingVersion", last_processed_version) \
    .load("/mnt/data/bronze/customer")

# CDF returns: _change_type (insert, update_preimage, update_postimage, delete), _commit_version, _commit_timestamp
changes = cdf_df.select(
    "_change_type",
    "_commit_version",
    "_commit_timestamp",
    "customer_id",
    "customer_name",
    "email",
)

# Separate insert/update from delete
inserts_updates = changes.filter(F.col("_change_type").isin("insert", "update_postimage"))
deletes = changes.filter(F.col("_change_type") == "delete")

# Process inserts/updates
cleansed = inserts_updates.select(
    F.col("customer_id").cast("string"),
    F.initcap(F.trim(F.col("customer_name"))),
    F.lower(F.trim(F.col("email"))),
)

# Merge into Silver
from delta.tables import DeltaTable
silver_table = DeltaTable.forPath(spark, "/mnt/data/silver/dim_customer")

silver_table.alias("target") \
    .merge(
        cleansed.alias("source"),
        "target.customer_id = source.customer_id"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

# Handle deletes (mark as inactive or hard delete)
if deletes.count() > 0:
    silver_table.alias("target") \
        .merge(
            deletes.select("customer_id").alias("source"),
            "target.customer_id = source.customer_id"
        ) \
        .whenMatchedUpdate(
            set={"is_active": F.lit(False), "updated_at": F.current_timestamp()}
        ) \
        .execute()

# Track last processed version
last_version = cdf_df.agg(F.max(F.col("_commit_version"))).collect()[0][0]
spark.sql(f"UPDATE _metadata SET last_cdf_version = {last_version} WHERE table_name = 'dim_customer'")

MERGE INTO with Insert/Update/Delete Conditions

Implement complex MERGE logic with different handling per operation type.


from delta.tables import DeltaTable
from pyspark.sql import functions as F

silver_table = DeltaTable.forPath(spark, "/mnt/data/silver/fact_order")
new_orders = spark.read.format("delta").load("/mnt/data/bronze/fact_order")

# Multi-condition MERGE
(
    silver_table.alias("target")
    .merge(
        new_orders.alias("source"),
        "target.order_id = source.order_id"
    )
    # Update existing: only if newer and quality improved
    .whenMatchedUpdate(
        condition="source.order_quality_score > target.order_quality_score",
        set={
            "order_amount": "source.order_amount",
            "order_status": "source.order_status",
            "updated_at": F.current_timestamp(),
        }
    )
    # Insert new: filter out test orders
    .whenNotMatchedInsert(
        condition="source.order_type != 'TEST' AND source.order_amount > 0",
        values={
            "order_id": "source.order_id",
            "customer_id": "source.customer_id",
            "order_amount": "source.order_amount",
            "order_date": "source.order_date",
            "created_at": F.current_timestamp(),
        }
    )
    .execute()
)

# SQL equivalent
spark.sql("""
    MERGE INTO silver.fact_order target
    USING bronze.fact_order source
    ON target.order_id = source.order_id
    WHEN MATCHED AND source.order_quality_score > target.order_quality_score THEN
        UPDATE SET
            order_amount = source.order_amount,
            order_status = source.order_status,
            updated_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED AND source.order_type != 'TEST' AND source.order_amount > 0 THEN
        INSERT (order_id, customer_id, order_amount, order_date, created_at)
        VALUES (source.order_id, source.customer_id, source.order_amount, source.order_date, CURRENT_TIMESTAMP())
""")

Streaming Silver Tables with foreachBatch

Process streaming Bronze data into Silver with exactly-once semantics via foreachBatch.


from pyspark.sql import functions as F
from delta.tables import DeltaTable

def upsert_to_silver(batch_df, batch_id):
    """Called for each micro-batch during streaming."""

    # Cleanse and deduplicate within batch
    window_spec = Window.partitionBy("customer_id").orderBy(F.desc("_bronze_loaded_at"))
    cleansed_batch = (
        batch_df
        .select(
            F.col("id").cast("string").alias("customer_id"),
            F.initcap(F.trim(F.col("name"))).alias("customer_name"),
            F.lower(F.trim(F.col("email"))).alias("email"),
            F.col("created_at").cast("timestamp").alias("created_at"),
            F.current_timestamp().alias("_silver_updated_at"),
        )
        .withColumn("rn", F.row_number().over(window_spec))
        .filter(F.col("rn") == 1)
        .drop("rn")
    )

    # Merge into Silver table
    silver_table = DeltaTable.forPath(spark, "/mnt/data/silver/dim_customer")

    silver_table.alias("target") \
        .merge(
            cleansed_batch.alias("source"),
            "target.customer_id = source.customer_id"
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

    print(f"Batch {batch_id}: upserted {cleansed_batch.count()} records to Silver")

# Start streaming
streaming_bronze = spark.readStream \
    .format("delta") \
    .option("startingVersion", 0) \
    .load("/mnt/data/bronze/customer_events")

streaming_bronze \
    .writeStream \
    .foreachBatch(upsert_to_silver) \
    .option("checkpointLocation", "/mnt/data/.checkpoints/silver_customer") \
    .start()

Exactly-Once Processing Guarantees

Achieve idempotency through MERGE and deduplication strategies that handle retries gracefully.


from pyspark.sql import functions as F
import uuid

# Add idempotency key to track duplicates across retries
def add_idempotency_key(df):
    """Add deterministic idempotency key (NOT random UUID)."""
    # Use a hash of business key + timestamp + content
    return df.withColumn(
        "idempotency_key",
        F.md5(
            F.concat(
                F.col("customer_id"),
                F.col("_bronze_loaded_at"),
                F.md5(F.to_json(F.struct("*")))  # Hash of entire row
            )
        )
    )

bronze_with_key = add_idempotency_key(bronze_df)

# MERGE with idempotency: duplicate batches are safe
silver_table = DeltaTable.forPath(spark, "/mnt/data/silver/dim_customer")

silver_table.alias("target") \
    .merge(
        bronze_with_key.alias("source"),
        """
        target.customer_id = source.customer_id
        AND target.idempotency_key = source.idempotency_key
        """
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

# Alternative: use natural idempotency via business key + timestamp
# If the exact same record (customer_id, timestamp) arrives twice, MERGE ignores it

# Verify exactly-once with deduplication count
dedup_check = bronze_with_key.groupBy("idempotency_key").count()
duplicates_in_batch = dedup_check.filter(F.col("count") > 1).count()

if duplicates_in_batch > 0:
    print(f"WARNING: Found {duplicates_in_batch} potential duplicates in batch")
    # MERGE will still be idempotent

10. Production Patterns

Naming Conventions and Metadata

Establish consistent naming to enable discoverability and data governance.


# Naming convention: silver_{domain}_{entity_type}_{version}
# domain: customer, order, inventory, etc.
# entity_type: dim (dimension), fact (fact table), agg (aggregate)
# version: optional, for major schema changes

table_names = {
    # Dimensions
    "silver_customer_dim": "Customer dimension with SCD Type 2",
    "silver_product_dim": "Product dimension with catalog",
    "silver_date_dim": "Date dimension with fiscal calendar",
    "silver_store_dim": "Store dimension with location hierarchy",

    # Facts
    "silver_order_fact": "Order fact table (transactional)",
    "silver_daily_sales_fact": "Daily sales aggregate",
    "silver_inventory_fact": "Inventory levels by warehouse/product",

    # Supporting tables
    "silver_order_lineitem_fact": "Order line items (normalized)",
}

# Register tables with metadata
for table_name, description in table_names.items():
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_name}
        USING DELTA
        COMMENT '{description}'
        TBLPROPERTIES (
            'owner' = 'data_engineering@company.com',
            'domain' = 'core',
            'data_classification' = 'internal',
            'retention_days' = '2555',
            'refresh_frequency' = 'daily',
            'sla_freshness_hours' = '24'
        )
    """)

# Add column-level comments
spark.sql("""
    COMMENT ON COLUMN silver_customer_dim.customer_id
    IS 'Business key: unique customer identifier from source system'
""")

spark.sql("""
    COMMENT ON COLUMN silver_customer_dim._scd_version
    IS 'SCD Type 2 version number, increments with each change'
""")

Table Ownership and Access Control

Implement role-based access control at the table and column level.


# Set table ownership
spark.sql("""
    ALTER TABLE silver_customer_dim
    SET OWNER 'data_engineering_team@company.com'
""")

# Grant read access to analytics team
spark.sql("""
    GRANT SELECT ON TABLE silver_customer_dim
    TO GROUP analytics_team@company.com
""")

# Grant write access (append only) to data pipeline service
spark.sql("""
    GRANT INSERT, UPDATE ON TABLE silver_customer_dim
    TO `databricks_job_service@company.com`
""")

# Deny delete access (prevent accidental deletes)
spark.sql("""
    DENY DELETE ON TABLE silver_customer_dim
    FROM GROUP `all_users@company.com`
""")

# Column-level access control (masking for sensitive data)
spark.sql("""
    CREATE OR REPLACE FUNCTION silver_customer_dim.email_masked()
    RETURNS STRING
    AS 'CASE WHEN is_member(\"security/pii_viewers\") THEN email ELSE \"***@***.***\" END'
""")

# Dynamic column masking
spark.sql("""
    ALTER TABLE silver_customer_dim
    ALTER COLUMN email
    SET MASK email_masked()
""")

Partitioning Strategies

Partition Silver tables for query performance and lifecycle management.


# Strategy 1: Partition by business date (most common)
spark.sql("""
    CREATE TABLE silver_order_fact
    USING DELTA
    PARTITIONED BY (order_date)
    AS
    SELECT
        order_id,
        customer_id,
        order_amount,
        order_date,
        created_at
    FROM bronze.order_fact
""")

# Strategy 2: Partition by year/month for large tables
spark.sql("""
    CREATE TABLE silver_transaction_fact
    USING DELTA
    PARTITIONED BY (transaction_year, transaction_month)
    AS
    SELECT
        transaction_id,
        amount,
        YEAR(transaction_date) as transaction_year,
        MONTH(transaction_date) as transaction_month,
        transaction_date
    FROM bronze.transaction_fact
""")

# Strategy 3: Partition by ingestion date for incremental loads
spark.sql("""
    CREATE TABLE silver_event_fact
    USING DELTA
    PARTITIONED BY (_ingestion_date)
    AS
    SELECT
        event_id,
        event_type,
        event_timestamp,
        CAST(event_timestamp AS DATE) as _ingestion_date
    FROM bronze.event_fact
""")

# Partition pruning verification
spark.sql("""
    SELECT *
    FROM silver_order_fact
    WHERE order_date >= '2026-01-01'
""").explain()  # Should show "PushedFilters: [IsNotNull(order_date), GreaterThanOrEqual(order_date, 2026-01-01)]"

OPTIMIZE and ZORDER for Query Patterns

Use OPTIMIZE and ZORDER to organize data for expected query patterns.


# Basic OPTIMIZE: compact small files, remove deletes
spark.sql("""
    OPTIMIZE silver_customer_dim
""")

# OPTIMIZE with ZORDER: co-locate related data
# ZORDER on customer_id means queries filtering on customer_id will scan fewer files
spark.sql("""
    OPTIMIZE silver_order_fact
    ZORDER BY customer_id
""")

# ZORDER multiple columns: first column has most weight
spark.sql("""
    OPTIMIZE silver_inventory_fact
    ZORDER BY warehouse_id, product_id
""")

# Automated optimization with TBLPROPERTIES
spark.sql("""
    ALTER TABLE silver_customer_dim
    SET TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true'
    )
""")

# Monitor optimization performance
spark.sql("""
    DESCRIBE DETAIL silver_customer_dim
""").select("numFiles", "sizeInBytes").show()

# Before OPTIMIZE: 10,000 small files, 500MB
# After OPTIMIZE + ZORDER: 50 larger files, 500MB (much faster queries)

Monitoring Data Freshness and Completeness

Implement metrics tracking and alerting for data pipeline health.


from pyspark.sql import functions as F
from datetime import datetime, timedelta

class SilverMonitor:
    """Monitor Silver layer data freshness and quality."""

    def __init__(self, spark_session):
        self.spark = spark_session

    def monitor_freshness(self, table_path, timestamp_column, sla_hours=24):
        """Check if data is within SLA freshness."""
        df = self.spark.read.format("delta").load(table_path)

        latest_timestamp = df.agg(F.max(F.col(timestamp_column))).collect()[0][0]
        hours_behind = (datetime.utcnow() - latest_timestamp).total_seconds() / 3600

        freshness_ok = hours_behind <= sla_hours

        return {
            "table": table_path,
            "latest_data_timestamp": latest_timestamp,
            "hours_behind": hours_behind,
            "sla_hours": sla_hours,
            "freshness_ok": freshness_ok,
            "check_timestamp": datetime.utcnow().isoformat(),
        }

    def monitor_completeness(self, table_path, expected_row_count, tolerance_pct=10):
        """Check if table has expected row count (within tolerance)."""
        df = self.spark.read.format("delta").load(table_path)

        actual_count = df.count()
        expected_low = expected_row_count * (1 - tolerance_pct / 100)
        expected_high = expected_row_count * (1 + tolerance_pct / 100)

        completeness_ok = expected_low <= actual_count <= expected_high

        return {
            "table": table_path,
            "actual_row_count": actual_count,
            "expected_row_count": expected_row_count,
            "tolerance_pct": tolerance_pct,
            "completeness_ok": completeness_ok,
            "check_timestamp": datetime.utcnow().isoformat(),
        }

    def monitor_duplicates(self, table_path, key_columns):
        """Detect duplicate business keys."""
        df = self.spark.read.format("delta").load(table_path)

        dedup_count = df.select(*key_columns).distinct().count()
        total_count = df.count()
        duplicate_count = total_count - dedup_count

        no_duplicates = duplicate_count == 0

        return {
            "table": table_path,
            "key_columns": key_columns,
            "total_rows": total_count,
            "unique_keys": dedup_count,
            "duplicate_rows": duplicate_count,
            "no_duplicates": no_duplicates,
            "check_timestamp": datetime.utcnow().isoformat(),
        }

    def report_all(self, tables_config):
        """Run all checks and generate report."""
        results = []

        for table_name, config in tables_config.items():
            # Freshness check
            freshness = self.monitor_freshness(
                config["path"],
                config["timestamp_col"],
                config.get("sla_hours", 24)
            )
            results.append({"check_type": "freshness", **freshness})

            # Completeness check
            if "expected_row_count" in config:
                completeness = self.monitor_completeness(
                    config["path"],
                    config["expected_row_count"],
                    config.get("tolerance_pct", 10)
                )
                results.append({"check_type": "completeness", **completeness})

            # Duplicate check
            if "key_columns" in config:
                duplicates = self.monitor_duplicates(
                    config["path"],
                    config["key_columns"]
                )
                results.append({"check_type": "duplicates", **duplicates})

        # Write to monitoring table
        report_df = self.spark.createDataFrame(results)
        report_df.write \
            .format("delta") \
            .mode("append") \
            .save("/mnt/data/silver/_monitoring_metrics")

        # Check for failures
        failures = [r for r in results if not r.get(f"{r['check_type']}_ok", True)]

        if failures:
            self._send_alert(failures)

        return report_df

    def _send_alert(self, failures):
        """Send alert for monitoring failures."""
        print(f"ALERT: {len(failures)} monitoring checks failed:")
        for failure in failures:
            print(f"  {failure['check_type']}: {failure['table']}")
            # In production: send to Slack, PagerDuty, email, etc.

# Usage
monitor = SilverMonitor(spark)

tables_config = {
    "dim_customer": {
        "path": "/mnt/data/silver/dim_customer",
        "timestamp_col": "updated_at",
        "sla_hours": 24,
        "expected_row_count": 500000,
        "tolerance_pct": 5,
        "key_columns": ["customer_id"],
    },
    "fact_order": {
        "path": "/mnt/data/silver/fact_order",
        "timestamp_col": "created_at",
        "sla_hours": 12,
        "expected_row_count": 10000000,
        "tolerance_pct": 10,
        "key_columns": ["order_id"],
    },
}

monitor.report_all(tables_config)

Conclusion

The Silver layer transforms raw Bronze data into a reliable, governed foundation for analytics. By implementing the design principles, cleansing strategies, deduplication patterns, and quality checks outlined here, you create a stable contract between data producers and consumers. SCD implementations, proper joins, and incremental processing patterns enable efficient downstream analytics at scale. Production patterns around naming, ownership, partitioning, and monitoring ensure the Silver layer remains maintainable and trustworthy as your data estate grows. The code patterns presented are battle-tested across enterprise data warehouses. Start with the foundational concepts—null handling, deduplication, and schema enforcement—then layer in advanced patterns like SCD Type 2 and streaming processing as your needs grow.