Gold Layer: Medallion Architecture in Databricks

1. Overview

The Gold Layer represents the pinnacle of the Medallion Architecture—a curated, business-ready consumption layer purpose-built for analytics, reporting, machine learning, and API serving. While Bronze ingests raw data and Silver applies transformation and deduplication, Gold is the stage where data becomes actionable intelligence aligned with business metrics and domain semantics.

The Gold Layer serves multiple audiences simultaneously: business analysts querying dashboards, data scientists training ML models, BI tools like Tableau and Power BI, downstream APIs exposing data to applications, and data products serving other teams. Every table in Gold is optimized for its consumption pattern—read-heavy, denormalized, pre-aggregated, and governed under defined SLAs.

Unlike Silver, which emphasizes clean, normalized tables suitable for integration and reuse, Gold embraces denormalization for performance. A Gold table might pre-compute monthly revenue totals, join customer attributes into fact tables, or collapse dimensions to support interactive queries. This deliberate tradeoff between storage and query speed reflects Gold's consumer-focused design philosophy.

Gold tables embody business semantics. Column names use business terminology (revenue, customer_segment) rather than technical names. Tables are organized by domain—marketing_gold, finance_gold, product_gold—each with clear ownership. Documentation is mandatory; every table includes TBLPROPERTIES describing refresh cadence, data freshness SLA, and intended consumers.

2. Design Principles

Business-Aligned Naming: Gold tables use business domain language, not technical identifiers. Instead of tbl_agg_005_v3, write gold_marketing_daily_campaign_roi. Every column name should be self-documenting: customer_lifetime_value rather than clv_val. This naming strategy reduces onboarding friction and minimizes semantic errors.

Pre-Computed Aggregations: Gold is where aggregation happens. Rather than force analysts to write window functions daily, Gold pre-computes common KPIs: daily revenue by product category, weekly user engagement by cohort, monthly churn rates by segment. These aggregates are materialized as separate tables, trading storage for query latency.

Denormalization for Read Performance: Gold tables intentionally duplicate data across tables. A fact_sales table includes customer demographic columns even though they exist in dim_customer. This denormalization eliminates joins during query execution, critical for interactive BI dashboards querying billions of rows.

SLA-Driven Refresh Cadence: Every Gold table has a defined refresh frequency tied to business requirements. A daily revenue dashboard needs data refreshed by 8 AM. A real-time recommendation engine needs sub-minute latency. Gold table refresh pipelines are orchestrated with monitoring to catch SLA breaches immediately.

Versioned Metrics: Metrics evolve. When definition of revenue or customer_segment changes, Gold maintains versioned tables: gold_finance_monthly_revenue_v1 and gold_finance_monthly_revenue_v2. This prevents breaking downstream consumers while supporting gradual migration to new definitions.

Idempotent Rebuilds: Gold pipelines must be replayable. A failed job should be recoverable by re-running the pipeline without corrupting results. This requires MERGE-based incremental loads for facts, full rebuilds for dimensions, and careful handling of late-arriving data and corrections.

3. Star Schema Design

Dimensional modeling remains foundational for Gold. Fact tables capture transactional events (sales, clicks, orders); dimension tables describe business entities (customers, products, time). This structure maximizes query performance and analytical flexibility.

Building Fact Tables from Silver: A Silver table containing normalized transaction data becomes a Gold fact table optimized for BI consumption. Dimensions are joined in, denormalized attributes added, and the result is a wide fact table supporting rapid slicing and dicing.


from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_date, round, coalesce, lag,
    when, sum as spark_sum, avg as spark_avg,
    window, dense_rank
)
from pyspark.sql.window import Window
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("GoldFactBuilder") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Read Silver layer normalized transaction data
silver_orders = spark.read.table("silver.orders")
silver_customers = spark.read.table("silver.customers")
silver_products = spark.read.table("silver.products")
silver_dates = spark.read.table("silver.dates")

# Build fact_sales with denormalized attributes
fact_sales = (
    silver_orders
    .join(silver_customers.select(
        col("customer_id"),
        col("customer_name"),
        col("segment").alias("customer_segment"),
        col("country"),
        col("created_at").alias("customer_created_at")
    ), on="customer_id", how="left")
    .join(silver_products.select(
        col("product_id"),
        col("product_name"),
        col("category"),
        col("subcategory"),
        col("unit_cost")
    ), on="product_id", how="left")
    .join(silver_dates.select(
        col("date_id"),
        col("date"),
        col("year"),
        col("quarter"),
        col("month"),
        col("day_of_week"),
        col("is_weekend")
    ), on="date_id", how="left")
    .select(
        col("order_id"),
        col("order_date"),
        col("customer_id"),
        col("customer_name"),
        col("customer_segment"),
        col("country"),
        col("product_id"),
        col("product_name"),
        col("category"),
        col("subcategory"),
        col("quantity"),
        col("unit_price"),
        (col("quantity") * col("unit_price")).alias("gross_revenue"),
        col("discount"),
        (col("quantity") * col("unit_price") * (1 - col("discount"))).alias("net_revenue"),
        (col("quantity") * col("unit_cost")).alias("cost_of_goods"),
        (col("quantity") * col("unit_price") * (1 - col("discount")) - col("quantity") * col("unit_cost")).alias("gross_profit"),
        col("year"),
        col("quarter"),
        col("month"),
        col("day_of_week"),
        col("is_weekend"),
        col("customer_created_at"),
        col("currency"),
        col("payment_method"),
        col("order_status")
    )
    .filter(col("order_status") != "cancelled")
    .withColumn("profit_margin",
        round(col("gross_profit") / col("gross_revenue"), 4))
)

# Write with ZORDER for common BI queries on customer, product, date
fact_sales.write \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .option("zorderBy", "customer_id,product_id,order_date") \
    .format("delta") \
    .save("dbfs:/gold/fact_sales")

# Register table with comprehensive metadata
spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.fact_sales
    USING DELTA
    LOCATION 'dbfs:/gold/fact_sales'
    COMMENT 'Denormalized sales fact table with customer and product attributes for BI consumption'
    TBLPROPERTIES (
        'owner' = 'analytics-platform-team',
        'refresh_frequency' = 'daily',
        'refresh_time_utc' = '06:00',
        'sla_freshness_hours' = '24',
        'pii_columns' = 'customer_name,country',
        'sensitive' = 'false',
        'retention_days' = '2555',
        'version' = '2.1',
        'lineage' = 'silver.orders -> gold.fact_sales'
    )
""")

print("Fact table gold.fact_sales created successfully")

SQL CREATE TABLE Example: Explicit table creation with proper data types and comprehensive documentation.


CREATE TABLE IF NOT EXISTS gold.fact_sales (
    order_id BIGINT NOT NULL COMMENT 'Unique order identifier, PK',
    order_date DATE NOT NULL COMMENT 'Order transaction date',
    customer_id BIGINT NOT NULL COMMENT 'Customer FK, links to dim_customer',
    customer_name STRING NOT NULL COMMENT 'Denormalized customer name for BI readability',
    customer_segment STRING COMMENT 'Customer business segment (Premium, Standard, Budget)',
    country STRING COMMENT 'Customer billing country',
    product_id BIGINT NOT NULL COMMENT 'Product FK, links to dim_product',
    product_name STRING NOT NULL COMMENT 'Denormalized product name',
    category STRING COMMENT 'Product category for hierarchical slicing',
    quantity INT NOT NULL COMMENT 'Units ordered',
    unit_price DECIMAL(10, 2) NOT NULL COMMENT 'Price per unit at time of order',
    gross_revenue DECIMAL(15, 2) NOT NULL COMMENT 'Quantity * unit_price before discount',
    discount DECIMAL(4, 3) NOT NULL COMMENT 'Applied discount as decimal (0.00 to 1.00)',
    net_revenue DECIMAL(15, 2) NOT NULL COMMENT 'Revenue after discount, primary KPI',
    cost_of_goods DECIMAL(15, 2) NOT NULL COMMENT 'COGS for margin calculation',
    gross_profit DECIMAL(15, 2) NOT NULL COMMENT 'net_revenue - cost_of_goods',
    profit_margin DECIMAL(5, 4) NOT NULL COMMENT 'gross_profit / gross_revenue, typically 0.0-1.0',
    year INT NOT NULL COMMENT 'Fiscal year for time-based filtering',
    month INT NOT NULL COMMENT 'Month (1-12) for cohort analysis',
    day_of_week STRING COMMENT 'Day name (Monday, Tuesday, ...) for pattern analysis',
    is_weekend BOOLEAN COMMENT 'true if order placed on weekend',
    payment_method STRING COMMENT 'Payment type (credit_card, paypal, bank_transfer)',
    order_status STRING COMMENT 'Current order status (pending, shipped, delivered, returned)',
    _updated_at TIMESTAMP NOT NULL COMMENT 'When this row was last updated',
    _updated_date DATE GENERATED ALWAYS AS (CAST(_updated_at AS DATE))
)
USING DELTA
PARTITIONED BY (year, month)
COMMENT 'Denormalized sales fact table with customer and product attributes. Primary consumption layer for BI dashboards. Refreshed daily at 06:00 UTC. SLA: data available by 06:30 UTC.'
TBLPROPERTIES (
    'owner' = 'analytics-team',
    'refresh_frequency' = 'daily',
    'refresh_sla_minutes' = '30',
    'pii_columns' = 'customer_name,country,payment_method',
    'metric_definitions' = 'gross_profit=net_revenue-cost_of_goods,profit_margin=gross_profit/gross_revenue',
    'version' = '2.1'
);

CREATE INDEX idx_fact_sales_customer ON gold.fact_sales (customer_id);
CREATE INDEX idx_fact_sales_product ON gold.fact_sales (product_id);
CREATE INDEX idx_fact_sales_date ON gold.fact_sales (order_date);

4. Aggregate Tables

Pre-computed aggregations eliminate expensive GROUP BY operations on large fact tables. Gold materializes common rollups: daily/weekly/monthly summaries, running totals, moving averages, and cohort analyses. These tables serve dashboards requesting the same aggregations millions of times daily.


from pyspark.sql.functions import (
    col, sum as spark_sum, avg as spark_avg, count,
    min as spark_min, max as spark_max, stddev,
    lag, first_value, last_value, rank,
    when, round, date_sub, date_add
)
from pyspark.sql.window import Window
from datetime import datetime, timedelta

# Build gold_finance_daily_revenue: aggregated daily metrics by customer segment
daily_revenue = (
    spark.read.table("gold.fact_sales")
    .filter(col("order_status").isin(["shipped", "delivered"]))
    .groupBy(
        col("order_date").alias("revenue_date"),
        col("customer_segment"),
        col("category")
    )
    .agg(
        spark_sum("net_revenue").alias("total_revenue"),
        spark_sum("gross_profit").alias("total_profit"),
        spark_sum("quantity").alias("total_units"),
        count("order_id").alias("transaction_count"),
        spark_avg("profit_margin").alias("avg_profit_margin"),
        spark_max("order_id").alias("max_order_id"),
        spark_min("unit_price").alias("min_unit_price"),
        spark_max("unit_price").alias("max_unit_price")
    )
    .withColumn("profit_ratio", round(col("total_profit") / col("total_revenue"), 4))
    .withColumn("avg_transaction_value", round(col("total_revenue") / col("transaction_count"), 2))
)

# Apply window functions for period-over-period comparison
window_spec = Window \
    .partitionBy(col("customer_segment"), col("category")) \
    .orderBy(col("revenue_date"))

daily_revenue_with_trends = (
    daily_revenue
    .withColumn("prior_day_revenue", lag("total_revenue").over(window_spec))
    .withColumn("revenue_growth_pct", round(
        ((col("total_revenue") - col("prior_day_revenue")) / col("prior_day_revenue")) * 100, 2
    ))
    .withColumn("revenue_30day_avg", avg("total_revenue")
        .over(window_spec.rangeBetween(-29 * 86400, 0)))
    .withColumn("revenue_7day_sum", spark_sum("total_revenue")
        .over(window_spec.rangeBetween(-6 * 86400, 0)))
    .withColumn("rank_by_segment", rank()
        .over(Window.partitionBy("customer_segment").orderBy(col("total_revenue").desc())))
)

# Write with proper partitioning and clustering for query efficiency
daily_revenue_with_trends.write \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .partitionBy("revenue_date") \
    .option("zorderBy", "customer_segment,category") \
    .format("delta") \
    .save("dbfs:/gold/daily_revenue")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.gold_finance_daily_revenue
    USING DELTA
    LOCATION 'dbfs:/gold/daily_revenue'
    COMMENT 'Pre-aggregated daily revenue metrics by segment and category for dashboard consumption'
    TBLPROPERTIES (
        'owner' = 'finance-analytics',
        'refresh_frequency' = 'daily',
        'refresh_time_utc' = '08:00',
        'sla_freshness_hours' = '2',
        'metric_type' = 'aggregate',
        'grain' = 'daily',
        'dimensions' = 'customer_segment,category',
        'measures' = 'total_revenue,total_profit,transaction_count'
    )
""")

# ============================================================================
# Weekly user engagement aggregates with cumulative metrics
# ============================================================================

weekly_engagement = (
    spark.read.table("gold.fact_sales")
    .withColumn("week_start", date_sub(col("order_date"),
        (col("day_of_week_num") - 1)))  # ISO week starting Monday
    .groupBy("week_start", col("customer_segment"))
    .agg(
        count(col("customer_id")).alias("unique_customers"),
        count("order_id").alias("total_orders"),
        spark_sum("net_revenue").alias("weekly_revenue"),
        spark_sum("quantity").alias("total_items_ordered"),
        spark_avg("net_revenue").alias("avg_order_value"),
        sparkmax("order_date").alias("last_transaction_date")
    )
)

# Add cohort and retention analysis
cohort_window = Window \
    .partitionBy("customer_segment") \
    .orderBy(col("week_start"))

weekly_engagement_enriched = (
    weekly_engagement
    .withColumn("revenue_rank", rank().over(
        Window.partitionBy("customer_segment").orderBy(col("weekly_revenue").desc())
    ))
    .withColumn("cumulative_revenue", spark_sum("weekly_revenue").over(cohort_window))
    .withColumn("weeks_active_cumulative",
        count("week_start").over(cohort_window))
    .withColumn("prior_week_revenue",
        lag("weekly_revenue").over(cohort_window))
    .withColumn("revenue_trend",
        when(col("prior_week_revenue").isNull(), "first_week")
        .when(col("weekly_revenue") > col("prior_week_revenue"), "growth")
        .when(col("weekly_revenue") < col("prior_week_revenue"), "decline")
        .otherwise("flat"))
)

weekly_engagement_enriched.write \
    .mode("overwrite") \
    .partitionBy("week_start") \
    .option("zorderBy", "customer_segment") \
    .format("delta") \
    .save("dbfs:/gold/weekly_engagement")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.gold_product_weekly_engagement
    USING DELTA
    LOCATION 'dbfs:/gold/weekly_engagement'
    COMMENT 'Weekly engagement metrics with trend analysis and cumulative retention metrics'
    TBLPROPERTIES (
        'owner' = 'product-analytics',
        'refresh_frequency' = 'weekly',
        'grain' = 'weekly'
    )
""")

print("Aggregate tables created: daily_revenue, weekly_engagement")

5. Materialized Views & Delta Live Tables

Delta Live Tables (DLT) provide declarative data pipeline definitions for Gold layer materialization. Rather than imperative Spark jobs, DLT pipelines specify data quality expectations, dependencies, and refresh modes (batch vs streaming) through Python decorators and SQL.


# File: gold_pipeline.py
# Run via: databricks jobs create --json @dlt_config.json

import dlt
from pyspark.sql.functions import (
    col, sum as spark_sum, avg as spark_avg,
    count, when, round, date_format,
    window as spark_window
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

@dlt.table(
    name="fact_sales_dlt",
    comment="DLT-managed fact table with quality expectations",
    table_properties={
        "owner": "analytics-platform",
        "refresh_frequency": "daily",
        "quality_tier": "gold"
    },
    partition_cols=["year", "month"]
)
@dlt.expect_or_fail("valid_order_ids", col("order_id").isNotNull())
@dlt.expect_or_drop("positive_revenue", col("net_revenue") > 0)
@dlt.expect_or_drop("valid_customer", col("customer_id") > 0)
@dlt.expect_or_warn("complete_data",
    (col("product_id").isNotNull()) &
    (col("customer_id").isNotNull()) &
    (col("order_date").isNotNull()))
def fact_sales_dlt():
    """
    Production fact table with quality gates.
    Rows failing 'fail' expectations block table refresh.
    Rows failing 'drop' expectations are silently removed.
    Rows failing 'warn' expectations are logged but included.
    """
    return (
        dlt.read("silver.orders")
        .join(dlt.read("silver.customers"), "customer_id", "left")
        .join(dlt.read("silver.products"), "product_id", "left")
        .select(
            col("order_id"),
            col("order_date"),
            col("customer_id"),
            col("customer_name"),
            col("customer_segment"),
            col("product_id"),
            col("product_name"),
            col("category"),
            col("quantity"),
            col("unit_price"),
            (col("quantity") * col("unit_price")).alias("gross_revenue"),
            (col("quantity") * col("unit_price") * (1 - col("discount"))).alias("net_revenue"),
            col("order_status"),
            col("_updated_at"),
            col("year"),
            col("month")
        )
    )

@dlt.view(
    name="daily_revenue_base",
    comment="Temporary view: daily aggregates before enrichment"
)
def daily_revenue_base():
    """Pre-aggregation before adding window functions"""
    return (
        dlt.read("fact_sales_dlt")
        .filter(col("order_status") != "cancelled")
        .groupBy(col("order_date"), col("customer_segment"))
        .agg(
            spark_sum("net_revenue").alias("daily_revenue"),
            spark_sum("quantity").alias("units_sold"),
            count("order_id").alias("transactions"),
            avg("unit_price").alias("avg_unit_price")
        )
    )

@dlt.table(
    name="daily_revenue_metrics",
    comment="Pre-aggregated daily revenue with trends",
    table_properties={
        "owner": "finance-analytics",
        "sla_freshness_hours": "2",
        "metric_type": "kpi"
    },
    partition_cols=["order_date"]
)
@dlt.expect_or_drop("positive_revenue", col("daily_revenue") >= 0)
@dlt.expect_or_warn("min_transactions", col("transactions") >= 1)
def daily_revenue_metrics():
    """
    KPI table for BI dashboards.
    Includes prior day comparison and growth rates.
    """
    from pyspark.sql.window import Window

    window_spec = Window \
        .partitionBy(col("customer_segment")) \
        .orderBy(col("order_date"))

    return (
        dlt.read("daily_revenue_base")
        .withColumn("prior_day_revenue",
            lag("daily_revenue").over(window_spec))
        .withColumn("revenue_growth_pct", round(
            ((col("daily_revenue") - col("prior_day_revenue")) /
             col("prior_day_revenue")) * 100, 2))
        .withColumn("7day_rolling_avg", avg("daily_revenue")
            .over(window_spec.rangeBetween(-6, 0)))
    )

@dlt.table(
    name="streaming_fact_sales",
    comment="Streaming ingest of real-time sales events",
    table_properties={
        "owner": "analytics-platform",
        "refresh_frequency": "real-time",
        "latency_sla_seconds": "60"
    }
)
def streaming_fact_sales():
    """
    Streaming table consuming from Kafka/Event Hub.
    Auto-scales with event volume.
    """
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka-cluster:9092")
        .option("subscribe", "orders-topic")
        .option("startingOffsets", "latest")
        .load()
        .select(
            col("value").cast("string"),
            col("timestamp")
        )
        # Parse JSON and transform...
    )

@dlt.table(
    name="customer_ltv_gold",
    comment="ML feature table: customer lifetime value",
    table_properties={
        "owner": "data-science-team",
        "feature_store_registered": "true",
        "refresh_frequency": "daily"
    }
)
def customer_ltv():
    """
    Feature table for ML models predicting churn and upsell.
    Integrated with Databricks Feature Store.
    """
    return (
        dlt.read("fact_sales_dlt")
        .groupBy("customer_id")
        .agg(
            spark_sum("net_revenue").alias("total_revenue"),
            count("order_id").alias("order_count"),
            avg("net_revenue").alias("avg_order_value"),
            max("order_date").alias("last_purchase_date")
        )
        .withColumn("ltv_segment",
            when(col("total_revenue") > 100000, "vip")
            .when(col("total_revenue") > 50000, "high_value")
            .when(col("total_revenue") > 10000, "medium_value")
            .otherwise("low_value"))
    )

DLT Pipeline Configuration: Orchestrate pipeline with monitoring and alerting.


# File: dlt_config.json
{
  "name": "gold_medallion_pipeline",
  "storage": "dbfs:/pipelines/gold",
  "configuration": {
    "environment": "production",
    "min_workers": 2,
    "max_workers": 8,
    "node_type_id": "i3.xlarge"
  },
  "clusters": [
    {
      "label": "default",
      "aws_attributes": {
        "availability": "SPOT_WITH_FALLBACK"
      }
    }
  ],
  "libraries": [
    {
      "notebook": {
        "path": "/Repos/my-org/databricks/gold_pipeline.py"
      }
    }
  ],
  "continuous": false,
  "channel": "ENHANCED",
  "edition": "ADVANCED",
  "notifications": [
    {
      "email_address": "analytics-team@company.com",
      "on_failure": true,
      "on_update": false
    }
  ]
}

6. Feature Store Tables

ML-ready feature tables are a specialized Gold artifact. Unlike BI tables optimized for slice-and-dice queries, feature tables are optimized for point-in-time joins during model training and inference. Databricks Feature Store provides versioning, freshness monitoring, and automatic lineage tracking.


from databricks.feature_store import FeatureStoreClient
from pyspark.sql.functions import (
    col, sum as spark_sum, avg as spark_avg,
    max as spark_max, min as spark_min,
    datediff, when, lag, dense_rank
)
from pyspark.sql.window import Window
from datetime import datetime, timedelta

fs = FeatureStoreClient()

# ============================================================================
# Customer behavior features: RFM (Recency, Frequency, Monetary)
# ============================================================================

def create_customer_rfm_features(reference_date: str = None):
    """
    Build RFM feature table for churn prediction and segmentation.

    Args:
        reference_date: Date to compute features against (for backtesting)
    """
    if reference_date is None:
        reference_date = datetime.now().strftime("%Y-%m-%d")

    ref_date = f"to_date('{reference_date}')"

    rfm_features = (
        spark.read.table("gold.fact_sales")
        .where(col("order_status").isin(["shipped", "delivered"]))
        .groupBy("customer_id", "customer_segment")
        .agg(
            # Recency: days since last purchase
            datediff(ref_date, spark_max("order_date"))
                .alias("recency_days"),
            # Frequency: purchase count in last 365 days
            count("order_id").alias("frequency_12m"),
            # Monetary: total revenue
            spark_sum("net_revenue").alias("monetary_value_12m"),
            # Additional behavioral features
            avg("net_revenue").alias("avg_order_value"),
            max("net_revenue").alias("max_order_value"),
            min("net_revenue").alias("min_order_value"),
            spark_sum("quantity").alias("total_items_purchased"),
            count(when(col("is_weekend"), 1)).alias("weekend_purchase_count")
        )
        .withColumn("feature_timestamp", col(f"to_timestamp({ref_date})"))
    )

    # Create or update feature table in Feature Store
    fs.create_table(
        name="customer_rfm_features",
        primary_keys=["customer_id"],
        path="dbfs:/feature-store/customer_rfm",
        df=rfm_features,
        description="RFM features for churn prediction (Recency, Frequency, Monetary)",
        tags={"team": "data-science", "use_case": "churn_prediction"}
    )

    return rfm_features

# ============================================================================
# Time-series product features: trend, seasonality, volatility
# ============================================================================

def create_product_timeseries_features():
    """
    Build time-series features for demand forecasting and anomaly detection.
    Includes trend (linear regression slope) and volatility (std dev).
    """

    daily_product_sales = (
        spark.read.table("gold.fact_sales")
        .filter(col("order_status") != "cancelled")
        .groupBy("order_date", "product_id", "category")
        .agg(
            spark_sum("quantity").alias("daily_units"),
            spark_sum("net_revenue").alias("daily_revenue"),
            count("order_id").alias("daily_transactions")
        )
    )

    # 90-day rolling statistics for each product
    window_90d = Window \
        .partitionBy("product_id") \
        .orderBy(col("order_date")) \
        .rangeBetween(-89 * 86400, 0)  # 89 days in seconds

    product_features = (
        daily_product_sales
        .withColumn("revenue_7day_avg",
            avg("daily_revenue").over(
                Window.partitionBy("product_id")
                    .orderBy(col("order_date"))
                    .rangeBetween(-6 * 86400, 0)))
        .withColumn("revenue_30day_avg",
            avg("daily_revenue").over(
                Window.partitionBy("product_id")
                    .orderBy(col("order_date"))
                    .rangeBetween(-29 * 86400, 0)))
        .withColumn("revenue_volatility_90d",
            stddev("daily_revenue").over(window_90d))
        .withColumn("trend_direction",
            when(col("revenue_7day_avg") > col("revenue_30day_avg"), "up")
            .when(col("revenue_7day_avg") < col("revenue_30day_avg"), "down")
            .otherwise("stable"))
        .filter(col("order_date") >= date_sub(current_date(), 90))
    )

    fs.create_table(
        name="product_timeseries_features",
        primary_keys=["product_id", "order_date"],
        path="dbfs:/feature-store/product_timeseries",
        df=product_features,
        description="Time-series features: rolling averages, volatility, trend direction",
        tags={"use_case": "demand_forecasting"}
    )

    return product_features

# ============================================================================
# Point-in-time correct features for ML model training
# ============================================================================

def create_training_dataset_with_pit_correctness(
    training_start_date: str,
    training_end_date: str,
    label_lookback_days: int = 30
):
    """
    Build training dataset with point-in-time correctness.
    Features computed at training date, labels computed from future data.

    Args:
        training_start_date: YYYY-MM-DD
        training_end_date: YYYY-MM-DD
        label_lookback_days: Days ahead to compute churn label
    """

    # Features: snapshot as of training date
    feature_snapshot = (
        fs.read_table("customer_rfm_features")
        .where(col("feature_timestamp").cast("date") >= training_start_date)
        .where(col("feature_timestamp").cast("date") <= training_end_date)
    )

    # Labels: whether customer churned in next 30 days
    future_activity = (
        spark.read.table("gold.fact_sales")
        .where(col("order_date") > training_end_date)
        .where(col("order_date") <= date_add(training_end_date, label_lookback_days))
        .groupBy("customer_id")
        .agg(count("order_id").alias("future_purchase_count"))
    )

    training_data = (
        feature_snapshot
        .join(future_activity, on="customer_id", how="left")
        .fillna(0, ["future_purchase_count"])
        .withColumn("churn_label",
            when(col("future_purchase_count") == 0, 1).otherwise(0))
        .select(
            "customer_id",
            "recency_days",
            "frequency_12m",
            "monetary_value_12m",
            "avg_order_value",
            "customer_segment",
            "churn_label"
        )
    )

    training_data.write \
        .mode("overwrite") \
        .format("delta") \
        .save("dbfs:/ml/training_datasets/churn_pit_correct")

    return training_data

# Execute feature creation
create_customer_rfm_features()
create_product_timeseries_features()
training_ds = create_training_dataset_with_pit_correctness(
    training_start_date="2024-01-01",
    training_end_date="2024-12-31",
    label_lookback_days=30
)

print("Feature Store tables created: customer_rfm_features, product_timeseries_features")

7. Serving Layer Patterns

Gold tables serve multiple downstream systems. Databricks SQL Warehouses provide low-latency access for BI tools. Serverless endpoints expose data via REST APIs. Proper indexing and clustering strategies ensure query performance even at scale.


-- Databricks SQL: Optimized queries for BI tool consumption

-- Dashboard Query 1: Real-time revenue dashboard (refreshes every 5 minutes)
SELECT
    CAST(order_date AS DATE) as revenue_date,
    customer_segment,
    category,
    ROUND(SUM(net_revenue), 2) as daily_revenue,
    ROUND(SUM(net_revenue) - SUM(cost_of_goods), 2) as daily_profit,
    ROUND(SUM(gross_profit) / SUM(gross_revenue), 4) as profit_margin,
    COUNT(DISTINCT customer_id) as unique_customers,
    COUNT(order_id) as transaction_count,
    ROUND(SUM(net_revenue) / COUNT(order_id), 2) as avg_order_value
FROM gold.fact_sales
WHERE order_status IN ('shipped', 'delivered')
    AND order_date >= DATE_SUB(CURRENT_DATE, 90)
GROUP BY
    CAST(order_date AS DATE),
    customer_segment,
    category
ORDER BY
    revenue_date DESC,
    daily_revenue DESC;

-- Dashboard Query 2: Customer segmentation with RFM quintiles
WITH rfm_scores AS (
    SELECT
        customer_id,
        customer_name,
        customer_segment,
        country,
        MAX(order_date) as last_purchase_date,
        DATEDIFF(CURRENT_DATE, MAX(order_date)) as recency_days,
        COUNT(DISTINCT order_id) as frequency_count,
        SUM(net_revenue) as monetary_value,
        PERCENT_RANK() OVER (
            PARTITION BY customer_segment
            ORDER BY MAX(order_date) DESC
        ) as recency_rank,
        PERCENT_RANK() OVER (
            PARTITION BY customer_segment
            ORDER BY COUNT(DISTINCT order_id)
        ) as frequency_rank,
        PERCENT_RANK() OVER (
            PARTITION BY customer_segment
            ORDER BY SUM(net_revenue)
        ) as monetary_rank
    FROM gold.fact_sales
    WHERE order_status = 'delivered'
    GROUP BY customer_id, customer_name, customer_segment, country
)
SELECT
    customer_id,
    customer_name,
    customer_segment,
    recency_days,
    frequency_count,
    ROUND(monetary_value, 2) as lifetime_value,
    CASE
        WHEN recency_rank >= 0.8 AND frequency_rank >= 0.8 AND monetary_rank >= 0.8 THEN 'VIP'
        WHEN recency_rank >= 0.6 AND monetary_rank >= 0.7 THEN 'High Value'
        WHEN frequency_rank >= 0.7 THEN 'Engaged'
        WHEN recency_days > 180 THEN 'At Risk'
        ELSE 'Standard'
    END as customer_classification
FROM rfm_scores
WHERE last_purchase_date >= DATE_SUB(CURRENT_DATE, 365)
ORDER BY monetary_value DESC;

-- Dashboard Query 3: Cohort analysis (date-over-date comparison)
SELECT
    DATE_TRUNC(order_date, MONTH) as cohort_month,
    DATE_TRUNC(order_date, WEEK) as week_of_month,
    customer_segment,
    COUNT(DISTINCT customer_id) as weekly_active_customers,
    ROUND(SUM(net_revenue), 2) as weekly_revenue,
    ROUND(SUM(net_revenue) / COUNT(DISTINCT customer_id), 2) as revenue_per_customer,
    LAG(SUM(net_revenue)) OVER (
        PARTITION BY customer_segment, DATE_TRUNC(order_date, MONTH)
        ORDER BY DATE_TRUNC(order_date, WEEK)
    ) as prior_week_revenue,
    ROUND(
        (SUM(net_revenue) - LAG(SUM(net_revenue)) OVER (
            PARTITION BY customer_segment, DATE_TRUNC(order_date, MONTH)
            ORDER BY DATE_TRUNC(order_date, WEEK)
        )) / LAG(SUM(net_revenue)) OVER (
            PARTITION BY customer_segment, DATE_TRUNC(order_date, MONTH)
            ORDER BY DATE_TRUNC(order_date, WEEK)
        ) * 100, 2
    ) as week_over_week_growth_pct
FROM gold.fact_sales
WHERE order_status IN ('shipped', 'delivered')
    AND order_date >= DATE_SUB(CURRENT_DATE, 365)
GROUP BY
    DATE_TRUNC(order_date, MONTH),
    DATE_TRUNC(order_date, WEEK),
    customer_segment
ORDER BY cohort_month DESC, week_of_month DESC;

REST API Serving via Databricks SQL: Expose Gold tables through endpoints for application consumption.


from databricks.sql import connect
from flask import Flask, jsonify, request
import logging

app = Flask(__name__)
logger = logging.getLogger(__name__)

# Initialize Databricks SQL connection
connection = connect(
    host="adb-xxxx.cloud.databricks.com",
    http_path="/sql/1.0/endpoints/xxxx",
    auth_type="pat",
    token="dapi-xxxxx"
)

@app.route("/api/v1/customer-ltv/", methods=["GET"])
def get_customer_ltv(customer_id: str):
    """
    REST endpoint: return customer lifetime value metrics
    Used by mobile app for personalized offers
    """
    try:
        cursor = connection.cursor()
        cursor.execute("""
            SELECT
                customer_id,
                customer_name,
                customer_segment,
                ROUND(SUM(net_revenue), 2) as lifetime_value,
                COUNT(DISTINCT order_id) as order_count,
                MAX(order_date) as last_purchase_date,
                DATEDIFF(CURRENT_DATE, MAX(order_date)) as days_since_purchase,
                ROUND(AVG(net_revenue), 2) as avg_order_value
            FROM gold.fact_sales
            WHERE customer_id = %s
                AND order_status = 'delivered'
            GROUP BY customer_id, customer_name, customer_segment
        """, (customer_id,))

        result = cursor.fetchone()
        cursor.close()

        if result:
            return jsonify({
                "customer_id": result[0],
                "name": result[1],
                "segment": result[2],
                "lifetime_value": float(result[3]),
                "total_orders": int(result[4]),
                "last_purchase_date": result[5].isoformat(),
                "days_since_purchase": int(result[6]),
                "avg_order_value": float(result[7])
            }), 200
        else:
            return jsonify({"error": "Customer not found"}), 404

    except Exception as e:
        logger.error(f"Error querying LTV for customer {customer_id}: {str(e)}")
        return jsonify({"error": "Internal server error"}), 500

@app.route("/api/v1/revenue-forecast/", methods=["GET"])
def get_revenue_forecast(segment: str):
    """
    REST endpoint: return 30-day revenue forecast by segment
    Used by finance system for reporting
    """
    try:
        cursor = connection.cursor()
        cursor.execute("""
            SELECT
                order_date,
                customer_segment,
                ROUND(SUM(net_revenue), 2) as daily_revenue,
                ROUND(AVG(SUM(net_revenue)) OVER (
                    PARTITION BY customer_segment
                    ORDER BY order_date
                    ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
                ), 2) as revenue_7day_avg,
                ROUND(COUNT(order_id), 0) as transaction_count
            FROM gold.fact_sales
            WHERE customer_segment = %s
                AND order_date >= DATE_SUB(CURRENT_DATE, 30)
                AND order_status IN ('shipped', 'delivered')
            GROUP BY order_date, customer_segment
            ORDER BY order_date DESC
        """, (segment,))

        rows = cursor.fetchall()
        cursor.close()

        data = [{
            "date": str(row[0]),
            "segment": row[1],
            "daily_revenue": float(row[2]),
            "rolling_7day_avg": float(row[3]),
            "transactions": int(row[4])
        } for row in rows]

        return jsonify({"forecast": data}), 200

    except Exception as e:
        logger.error(f"Error forecasting for segment {segment}: {str(e)}")
        return jsonify({"error": "Internal server error"}), 500

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=False)

8. Incremental Aggregation

Rebuilding multi-billion row aggregate tables nightly is unsustainable. Incremental patterns process only new/changed data, merging results into existing aggregates. This reduces compute by 90%+ while maintaining freshness.


from delta.tables import DeltaTable
from pyspark.sql.functions import (
    col, sum as spark_sum, count,
    max as spark_max, when, current_timestamp
)

# ============================================================================
# Incremental pattern: MERGE INTO for fact tables
# ============================================================================

def incremental_load_fact_sales():
    """
    Incremental merge: only new/updated orders since last run.
    Uses order_id as merge key; updates existing, inserts new.
    """

    # Read newly processed orders from Silver layer
    new_orders = (
        spark.read.table("silver.orders")
        .filter(col("_processed_date") >=
            (spark.read.table("gold.fact_sales")
             .agg(spark_max("_updated_at"))
             .collect()[0][0]))  # Get last update timestamp
    )

    # Denormalize with dimensions
    new_fact_data = (
        new_orders
        .join(spark.read.table("silver.customers"), "customer_id", "left")
        .join(spark.read.table("silver.products"), "product_id", "left")
        .select(
            col("order_id"),
            col("order_date"),
            col("customer_id"),
            col("customer_name"),
            col("customer_segment"),
            col("product_id"),
            col("product_name"),
            col("category"),
            col("quantity"),
            col("unit_price"),
            (col("quantity") * col("unit_price")).alias("gross_revenue"),
            col("discount"),
            (col("quantity") * col("unit_price") * (1 - col("discount")))
                .alias("net_revenue"),
            col("cost_of_goods"),
            current_timestamp().alias("_updated_at"),
            col("year"),
            col("month")
        )
    )

    # Merge into existing Gold fact table
    delta_table = DeltaTable.forName(spark, "gold.fact_sales")

    (delta_table
     .alias("target")
     .merge(
        new_fact_data.alias("source"),
        "target.order_id = source.order_id"
     )
     .whenMatchedUpdateAll(
        condition=col("source._updated_at") > col("target._updated_at")
     )
     .whenNotMatchedInsertAll()
     .execute())

    print("Incremental fact_sales merge completed")

# ============================================================================
# Incremental aggregates: Upsert daily summaries
# ============================================================================

def incremental_daily_aggregates():
    """
    Incremental daily aggregates.
    Compute only for dates with new/changed fact data.
    MERGE results into aggregate table.
    """

    # Get max date already aggregated
    max_agg_date = (
        spark.read.table("gold.gold_finance_daily_revenue")
        .agg(spark_max("revenue_date"))
        .collect()[0][0]
    )

    # Compute aggregates for new dates only
    new_aggregates = (
        spark.read.table("gold.fact_sales")
        .filter(col("order_date") > max_agg_date)
        .filter(col("order_status") != "cancelled")
        .groupBy(
            col("order_date").alias("revenue_date"),
            col("customer_segment"),
            col("category")
        )
        .agg(
            spark_sum("net_revenue").alias("total_revenue"),
            spark_sum("gross_profit").alias("total_profit"),
            spark_sum("quantity").alias("total_units"),
            count("order_id").alias("transaction_count"),
            avg("profit_margin").alias("avg_profit_margin"),
            current_timestamp().alias("_computed_at")
        )
    )

    # Merge into aggregate table
    delta_agg = DeltaTable.forName(spark, "gold.gold_finance_daily_revenue")

    (delta_agg
     .alias("target")
     .merge(
        new_aggregates.alias("source"),
        (col("target.revenue_date") == col("source.revenue_date")) &
        (col("target.customer_segment") == col("source.customer_segment")) &
        (col("target.category") == col("source.category"))
     )
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute())

    print(f"Incremental aggregates merged for {new_aggregates.count()} days")

# ============================================================================
# Streaming aggregations with watermarks
# ============================================================================

def streaming_revenue_aggregation():
    """
    Real-time revenue aggregation from streaming fact events.
    Uses event time watermarks to handle late-arriving data.
    Outputs checkpoint every 10 seconds.
    """

    streaming_facts = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "dbfs:/streaming/schema")
        .load("dbfs:/streaming/sales-events/")
    )

    # Watermark handles events arriving up to 5 minutes late
    aggregated = (
        streaming_facts
        .withWatermark("event_timestamp", "5 minutes")
        .groupBy(
            window(col("event_timestamp"), "1 minute", "30 seconds"),
            col("customer_segment"),
            col("product_category")
        )
        .agg(
            spark_sum("revenue").alias("period_revenue"),
            count("order_id").alias("period_transactions"),
            spark_max("event_timestamp").alias("max_event_time")
        )
    )

    # Write to Gold in append mode (idempotent)
    query = (
        aggregated
        .writeStream
        .format("delta")
        .option("checkpointLocation", "dbfs:/streaming/checkpoint/revenue")
        .outputMode("append")
        .option("mergeSchema", "true")
        .start("dbfs:/gold/streaming_revenue_aggregates")
    )

    return query

# Execute incremental loads
incremental_load_fact_sales()
incremental_daily_aggregates()

print("Incremental loading completed")

9. Data Mesh & Domain Ownership

Modern data platforms adopt data mesh principles: organizing Gold tables by business domain with clear ownership and cross-domain contracts. Each domain team owns their Gold tables, maintains SLAs, and publishes data products for other teams.


# File: gold_schema_management.py
# Organizing Gold tables by domain with Unity Catalog

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import EndpointConfPair

client = WorkspaceClient()

# ============================================================================
# Domain: Finance
# Owner: CFO-Analytics team, SLA: 2-hour freshness
# ============================================================================

spark.sql("""
    CREATE SCHEMA IF NOT EXISTS gold_finance
    COMMENT 'Finance domain Gold tables: revenue, costs, profitability, forecasts'
    WITH DBPROPERTIES (
        'owner' = 'finance-analytics@company.com',
        'sla_freshness_hours' = '2',
        'refresh_frequency' = 'daily_06:00_utc',
        'contact' = 'analytics-team@company.com',
        'data_classification' = 'confidential',
        'pii_present' = 'false',
        'domain' = 'finance',
        'critical_business_process' = 'revenue_reporting'
    );
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gold_finance.fact_revenue (
        revenue_id BIGINT NOT NULL COMMENT 'Unique revenue transaction ID',
        revenue_date DATE NOT NULL COMMENT 'Date revenue recognized',
        customer_id BIGINT NOT NULL COMMENT 'FK to gold_marketing.dim_customer',
        product_id BIGINT NOT NULL COMMENT 'FK to gold_product.dim_product',
        gross_revenue DECIMAL(15, 2) NOT NULL COMMENT 'Revenue before discounts',
        discounts DECIMAL(15, 2) COMMENT 'Total discounts applied',
        net_revenue DECIMAL(15, 2) NOT NULL COMMENT 'Revenue after discounts (KPI)',
        cost_of_goods DECIMAL(15, 2) NOT NULL COMMENT 'COGS for margin calc',
        gross_profit DECIMAL(15, 2) NOT NULL COMMENT 'net_revenue - cogs',
        tax_amount DECIMAL(15, 2) COMMENT 'Tax withheld',
        net_income DECIMAL(15, 2) COMMENT 'After-tax profit',
        _updated_at TIMESTAMP NOT NULL COMMENT 'Row last updated',
        _updated_date DATE GENERATED ALWAYS AS (CAST(_updated_at AS DATE))
    )
    USING DELTA
    PARTITIONED BY (revenue_date)
    COMMENT 'Core revenue fact table. Public contract: customer_id, product_id, net_revenue, revenue_date'
    TBLPROPERTIES (
        'owner' = 'finance-analytics',
        'refresh_sla_minutes' = '120',
        'refresh_time_utc' = '08:00',
        'pii_columns' = 'none',
        'sensitivity' = 'confidential',
        'metric_definitions' = 'net_revenue=gross_revenue - discounts, gross_profit=net_revenue - cost_of_goods',
        'public_contract' = 'customer_id, product_id, net_revenue, revenue_date, gross_profit',
        'version' = '2.0',
        'breaking_changes_since_v1' = 'Added tax_amount, net_income columns'
    );
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gold_finance.monthly_revenue_summary (
        summary_month DATE NOT NULL COMMENT 'First day of revenue month',
        total_revenue DECIMAL(15, 2) NOT NULL,
        total_cost DECIMAL(15, 2) NOT NULL,
        total_profit DECIMAL(15, 2) NOT NULL,
        margin_pct DECIMAL(5, 2) NOT NULL,
        transaction_count BIGINT NOT NULL,
        customer_count BIGINT NOT NULL,
        _computed_at TIMESTAMP NOT NULL
    )
    USING DELTA
    PARTITIONED BY (summary_month)
    COMMENT 'Pre-aggregated monthly revenue for executive dashboards'
    TBLPROPERTIES (
        'owner' = 'finance-analytics',
        'grain' = 'monthly',
        'refresh_frequency' = 'monthly',
        'retention_years' = '7',
        'gdpr_relevant' = 'false'
    );
""")

# ============================================================================
# Domain: Marketing
# Owner: Marketing Analytics team, SLA: 4-hour freshness
# ============================================================================

spark.sql("""
    CREATE SCHEMA IF NOT EXISTS gold_marketing
    COMMENT 'Marketing domain: campaigns, customer acquisition, attribution'
    WITH DBPROPERTIES (
        'owner' = 'marketing-analytics@company.com',
        'sla_freshness_hours' = '4',
        'contact' = 'marketing-team@company.com',
        'pii_present' = 'true',
        'data_classification' = 'confidential'
    );
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gold_marketing.dim_customer (
        customer_id BIGINT NOT NULL PRIMARY KEY,
        customer_name STRING NOT NULL,
        email STRING COMMENT 'PII: customer email',
        country STRING,
        segment STRING COMMENT 'Business segment assignment',
        lifetime_value DECIMAL(15, 2) COMMENT 'Total revenue from customer',
        acquisition_channel STRING COMMENT 'First touch attribution',
        acquisition_date DATE,
        _updated_at TIMESTAMP NOT NULL,
        _valid_from TIMESTAMP GENERATED ALWAYS AS (CAST(_updated_at AS TIMESTAMP)),
        _valid_to TIMESTAMP COMMENT 'SCD Type 2: when record becomes obsolete'
    )
    USING DELTA
    COMMENT 'Slowly Changing Dimension Type 2: customer attributes with history'
    TBLPROPERTIES (
        'owner' = 'marketing-analytics',
        'scd_type' = '2',
        'pii_columns' = 'customer_name,email',
        'refresh_frequency' = 'daily',
        'shared_across_domains' = 'true',
        'public_contract' = 'customer_id, customer_name, segment, lifetime_value'
    );
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gold_marketing.campaign_performance (
        campaign_id BIGINT NOT NULL,
        campaign_date DATE NOT NULL,
        campaign_name STRING NOT NULL,
        channel STRING COMMENT 'Marketing channel: email, social, paid_search',
        impressions BIGINT,
        clicks BIGINT,
        conversions BIGINT,
        revenue_attributed DECIMAL(15, 2),
        cost DECIMAL(15, 2),
        roi DECIMAL(5, 2) COMMENT 'Return on investment',
        cac DECIMAL(10, 2) COMMENT 'Customer acquisition cost',
        ltv_to_cac_ratio DECIMAL(5, 2),
        _updated_at TIMESTAMP NOT NULL
    )
    USING DELTA
    PARTITIONED BY (campaign_date)
    COMMENT 'Campaign-level attribution and performance metrics'
    TBLPROPERTIES (
        'owner' = 'marketing-analytics',
        'grain' = 'daily_by_campaign',
        'refresh_sla_minutes' = '240',
        'metric_definitions' = 'roi=(revenue_attributed - cost) / cost, cac=cost / conversions'
    );
""")

# ============================================================================
# Domain: Product
# Owner: Product Analytics team, SLA: 1-hour freshness (real-time KPIs)
# ============================================================================

spark.sql("""
    CREATE SCHEMA IF NOT EXISTS gold_product
    COMMENT 'Product domain: usage, features, user behavior'
    WITH DBPROPERTIES (
        'owner' = 'product-analytics@company.com',
        'sla_freshness_hours' = '1',
        'refresh_frequency' = 'continuous',
        'critical_dashboards' = 'product-health-real-time, feature-adoption'
    );
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gold_product.fact_user_events (
        event_id BIGINT NOT NULL,
        event_timestamp TIMESTAMP NOT NULL,
        user_id BIGINT NOT NULL,
        event_type STRING NOT NULL COMMENT 'page_view, button_click, form_submit',
        feature_flag STRING COMMENT 'A/B test or feature flag active',
        session_id STRING,
        device_type STRING,
        country STRING,
        _updated_at TIMESTAMP NOT NULL
    )
    USING DELTA
    PARTITIONED BY (event_timestamp DATE)
    COMMENT 'Real-time user event stream for product analytics and funnel analysis'
    TBLPROPERTIES (
        'owner' = 'product-analytics',
        'refresh_frequency' = 'real-time',
        'latency_sla_seconds' = '60',
        'retention_days' = '90',
        'pii_columns' = 'user_id,session_id',
        'shared_with_domains' = 'marketing,data-science'
    );
""")

# ============================================================================
# Cross-domain shared dimension: Date dimension (single source of truth)
# ============================================================================

spark.sql("""
    CREATE SCHEMA IF NOT EXISTS gold_common
    COMMENT 'Shared dimensions and common tables used across all domains'
    WITH DBPROPERTIES (
        'owner' = 'data-engineering@company.com',
        'sla_freshness_hours' = '24',
        'shared_across_all_domains' = 'true'
    );
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gold_common.dim_date (
        date_id INT NOT NULL PRIMARY KEY,
        date_value DATE NOT NULL UNIQUE,
        year INT,
        quarter INT,
        month INT,
        day_of_month INT,
        day_of_week INT,
        week_of_year INT,
        is_weekend BOOLEAN,
        is_holiday BOOLEAN,
        holiday_name STRING,
        fiscal_year INT,
        fiscal_quarter INT,
        fiscal_month INT
    )
    USING DELTA
    COMMENT 'Conformed dimension: single date dimension for all domains'
    TBLPROPERTIES (
        'owner' = 'data-engineering',
        'shared_across_domains' = 'true',
        'version' = '1.0',
        'note' = 'All domains must reference this table for time dimensions'
    );
""")

print("Domain-organized Gold schemas created with proper TBLPROPERTIES")

10. Production Patterns

Production-grade Gold layers require rigorous naming conventions, comprehensive documentation, automated freshness monitoring, and cost optimization. These patterns ensure reliability, maintainability, and cost efficiency at scale.


# File: gold_production_patterns.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, current_timestamp, sum as spark_sum,
    count, max as spark_max, min as spark_min,
    datediff, when, current_date
)
from delta.tables import DeltaTable
from databricks.sdk import WorkspaceClient
import logging
from datetime import datetime, timedelta

spark = SparkSession.builder \
    .appName("GoldProductionPatterns") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.databricks.optimizer.adaptive.skewJoin.enabled", "true") \
    .getOrCreate()

logger = logging.getLogger(__name__)

# ============================================================================
# Pattern 1: Naming conventions and table documentation
# ============================================================================

def create_gold_table_with_full_metadata(
    schema_name: str,
    table_name: str,
    dataframe,
    owner_email: str,
    refresh_frequency: str,
    sla_hours: int,
    description: str,
    pii_columns: list = None,
    metric_definitions: dict = None
):
    """
    Create Gold table with comprehensive production metadata.
    Naming: gold_{domain}_{entity}_{metric}_v{major}.{minor}

    Args:
        schema_name: Domain schema (gold_finance, gold_marketing, etc.)
        table_name: Base table name (fact_revenue, dim_customer, daily_kpis)
        dataframe: PySpark DataFrame to persist
        owner_email: Team responsible for SLA
        refresh_frequency: 'hourly', 'daily', 'weekly', 'monthly'
        sla_hours: Freshness SLA in hours
        description: Business description for documentation
        pii_columns: List of PII column names
        metric_definitions: Dict mapping metric name to calculation formula
    """

    full_table_name = f"{schema_name}.{table_name}"
    version = "1.0"

    # Write with optimizations for BI queries
    dataframe.write \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .option("zorderBy", "customer_id,product_id,order_date") \
        .format("delta") \
        .save(f"dbfs:/gold/{schema_name}/{table_name}")

    # Create table with full metadata
    metric_defs_str = ", ".join([
        f"{k}={v}" for k, v in (metric_definitions or {}).items()
    ]) if metric_definitions else "none"

    pii_str = ", ".join(pii_columns) if pii_columns else "none"

    create_sql = f"""
    CREATE TABLE IF NOT EXISTS {full_table_name}
    USING DELTA
    LOCATION 'dbfs:/gold/{schema_name}/{table_name}'
    COMMENT '{description}'
    TBLPROPERTIES (
        'owner' = '{owner_email}',
        'created_at' = '{datetime.now().isoformat()}',
        'refresh_frequency' = '{refresh_frequency}',
        'sla_freshness_hours' = '{sla_hours}',
        'sla_breach_contact' = '{owner_email}',
        'pii_columns' = '{pii_str}',
        'metric_definitions' = '{metric_defs_str}',
        'version' = '{version}',
        'table_type' = 'gold',
        'retention_days' = '2555',
        'lineage' = 'silver -> gold',
        'contains_pii' = '{"true" if pii_columns else "false"}',
        'gdpr_relevant' = '{"true" if pii_columns else "false"}',
        'data_quality_sla' = 'null_check,uniqueness_check,freshness_check'
    )
    """

    spark.sql(create_sql)
    logger.info(f"Created Gold table {full_table_name} with version {version}")

    return full_table_name

# ============================================================================
# Pattern 2: Automated freshness monitoring and alerting
# ============================================================================

def monitor_gold_freshness(schema_name: str, alert_email: str):
    """
    Monitor all Gold tables in a schema for SLA breaches.
    Query INFORMATION_SCHEMA and table properties.
    """

    # Get all tables in schema
    tables = spark.sql(f"""
        SELECT table_name, table_type
        FROM information_schema.tables
        WHERE table_schema = '{schema_name}'
            AND table_type = 'EXTERNAL'
    """).collect()

    freshness_report = []
    breached_tables = []

    for table_row in tables:
        table_name = table_row[0]
        full_table = f"{schema_name}.{table_name}"

        try:
            # Get table properties
            properties = spark.sql(f"SHOW TBLPROPERTIES {full_table}").collect()
            props_dict = {row[0]: row[1] for row in properties}

            sla_hours = int(props_dict.get("sla_freshness_hours", "24"))
            owner = props_dict.get("owner", "unknown")
            refresh_freq = props_dict.get("refresh_frequency", "unknown")

            # Get max update timestamp
            max_update = spark.sql(f"""
                SELECT MAX(_updated_at) FROM {full_table}
            """).collect()[0][0]

            if max_update:
                hours_since_update = (
                    datetime.now() - max_update.replace(tzinfo=None)
                ).total_seconds() / 3600

                is_breached = hours_since_update > sla_hours

                freshness_report.append({
                    "table": full_table,
                    "owner": owner,
                    "refresh_frequency": refresh_freq,
                    "sla_hours": sla_hours,
                    "hours_since_update": round(hours_since_update, 2),
                    "sla_breached": is_breached,
                    "last_update": max_update.isoformat()
                })

                if is_breached:
                    breached_tables.append({
                        "table": full_table,
                        "owner": owner,
                        "hours_overdue": round(hours_since_update - sla_hours, 2)
                    })

        except Exception as e:
            logger.warning(f"Error monitoring {full_table}: {str(e)}")

    # Log freshness report
    logger.info(f"Freshness Report for {schema_name}:")
    for report in freshness_report:
        status = "BREACHED" if report["sla_breached"] else "OK"
        logger.info(f"  {report['table']}: {status} ({report['hours_since_update']}h old)")

    # Alert if breaches detected
    if breached_tables:
        alert_message = f"SLA BREACH DETECTED in {schema_name}:\n"
        for breach in breached_tables:
            alert_message += f"  - {breach['table']}: {breach['hours_overdue']}h overdue\n"
            alert_message += f"    Contact: {breach['owner']}\n"

        logger.error(alert_message)
        # In production: call email API or Slack webhook
        # send_alert_email(alert_email, alert_message)

    return freshness_report, breached_tables

# ============================================================================
# Pattern 3: Table OPTIMIZE and ZORDER for BI query patterns
# ============================================================================

def optimize_gold_table_for_queries(table_name: str, z_order_columns: list):
    """
    Optimize Delta table for BI query patterns using ZORDER clustering.
    Run daily/weekly as part of maintenance.

    ZORDER on (customer_id, product_id) clusters data so these
    filter predicates execute at high selectivity.
    """

    z_order_clause = ", ".join(z_order_columns)

    optimize_sql = f"""
    OPTIMIZE {table_name}
    ZORDER BY ({z_order_clause})
    """

    start_time = datetime.now()
    spark.sql(optimize_sql)
    elapsed = (datetime.now() - start_time).total_seconds()

    logger.info(f"Optimized {table_name} in {elapsed:.2f}s with ZORDER by {z_order_clause}")

    # Get file stats after optimization
    stats = spark.sql(f"DESCRIBE DETAIL {table_name}").collect()[0]

    return {
        "table": table_name,
        "optimization_seconds": elapsed,
        "num_files": stats["numFiles"],
        "total_size_bytes": stats["sizeInBytes"]
    }

# ============================================================================
# Pattern 4: Cost management with serverless SQL warehouse auto-scaling
# ============================================================================

def configure_gold_warehouse_for_bi(warehouse_id: str, tag: str = "gold-bi"):
    """
    Configure Databricks SQL warehouse for Gold layer BI queries.
    Auto-scaling, serverless, with cost controls.
    """

    client = WorkspaceClient()

    warehouse_config = {
        "name": f"gold-bi-{tag}",
        "cluster_size": "2XL",
        "min_num_clusters": 1,
        "max_num_clusters": 8,
        "auto_stop_mins": 15,
        "enable_serverless_compute": True,
        "channel": "CHANNEL_NAME_CURRENT",
        "tags": {
            "environment": "production",
            "layer": "gold",
            "use_case": "bi",
            "cost_center": "analytics"
        }
    }

    logger.info(f"Warehouse configuration: {warehouse_config}")
    return warehouse_config

# ============================================================================
# Pattern 5: Data quality checks and validation rules at Gold layer
# ============================================================================

def validate_gold_table_quality(table_name: str, checks: dict):
    """
    Run data quality validation on Gold table.
    Checks include: nullness, uniqueness, freshness, value ranges.

    Args:
        table_name: Full table name (schema.table)
        checks: Dict of {column: {check_type: threshold}}
    """

    validation_results = {}

    for column, check_config in checks.items():
        if "null_pct_max" in check_config:
            null_check = spark.sql(f"""
                SELECT
                    COUNT(*) as total_rows,
                    COUNT({column}) as non_null_rows,
                    ROUND(100 * (1 - COUNT({column}) / COUNT(*)), 2) as null_pct
                FROM {table_name}
            """).collect()[0]

            null_pct = null_check[2]
            passed = null_pct <= check_config["null_pct_max"]

            validation_results[f"{column}_nullness"] = {
                "passed": passed,
                "null_pct": null_pct,
                "threshold": check_config["null_pct_max"]
            }

        if "min_value" in check_config or "max_value" in check_config:
            range_check = spark.sql(f"""
                SELECT
                    MIN({column}) as min_val,
                    MAX({column}) as max_val
                FROM {table_name}
            """).collect()[0]

            min_val, max_val = range_check[0], range_check[1]

            min_passed = (min_val is None) or (min_val >= check_config.get("min_value", float('-inf')))
            max_passed = (max_val is None) or (max_val <= check_config.get("max_value", float('inf')))

            validation_results[f"{column}_range"] = {
                "passed": min_passed and max_passed,
                "actual_min": min_val,
                "actual_max": max_val,
                "expected_range": {
                    "min": check_config.get("min_value"),
                    "max": check_config.get("max_value")
                }
            }

    # Log validation results
    logger.info(f"Quality validation for {table_name}:")
    all_passed = True
    for check_name, result in validation_results.items():
        status = "PASS" if result["passed"] else "FAIL"
        logger.info(f"  {check_name}: {status}")
        if not result["passed"]:
            all_passed = False

    return validation_results, all_passed

# Execute production patterns
quality_checks = {
    "net_revenue": {"null_pct_max": 0.1, "min_value": 0},
    "customer_id": {"null_pct_max": 0, "min_value": 1},
    "order_date": {"null_pct_max": 0}
}

validation_results, all_passed = validate_gold_table_quality(
    "gold.fact_sales",
    quality_checks
)

freshness_report, breaches = monitor_gold_freshness("gold_finance", "analytics@company.com")

optimize_stats = optimize_gold_table_for_queries(
    "gold.fact_sales",
    ["customer_id", "product_id", "order_date"]
)

print("Production patterns executed: quality validation, freshness monitoring, optimization")

Conclusion: The Gold Layer transforms raw and clean data into actionable business intelligence. By following dimensional modeling, implementing incremental patterns, organizing by domain, and embedding production-grade monitoring, organizations build reliable, scalable analytics platforms. Gold tables serve as the single source of truth for metrics, enabling self-service analytics while maintaining data governance and cost efficiency.