Bronze Layer - Medallion Architecture

1. Overview

The Bronze layer is the foundational tier of the Medallion (Lakehouse) architecture, serving as the raw data landing zone for all ingested data. It operates on a "schema-on-read" philosophy, prioritizing raw data preservation over immediate transformation and cleansing. The Bronze layer captures data exactly as it arrives from source systems, maintaining full fidelity for audit trails, reprocessing capabilities, and forensic analysis.

The primary purpose of the Bronze layer is:

Bronze tables are typically read-heavy and append-only, designed for high-throughput streaming and batch ingestion. They trade query optimization for ingestion speed and data integrity.

2. Design Principles

Effective Bronze layer design follows these core principles:

Append-Only Ingestion

Bronze tables use insert-only patterns to maintain immutability and enable efficient distributed writes. New data is appended in each run; deletes and updates are avoided to prevent partition rewrites and transaction conflicts.

Immutability and Write-Once Semantics

Once written, Bronze records should not be modified. This enables:

Minimal Transformation

Bronze ingestion applies only technical metadata enrichment, not business logic. Parsing, validation, and cleansing happen at Silver and Gold layers. This separation enables independent evolution of transformation logic and raw data retention.

Metadata Enrichment

Every Bronze record should include:

Partitioning Strategy

Bronze tables are partitioned by ingestion date (_ingestion_date) to:

3. Ingestion with Auto Loader

Auto Loader (Databricks' cloudFiles API) is the preferred mechanism for Bronze ingestion from cloud storage. It provides automatic schema detection, schema evolution, file deduplication, and incremental ingestion.

Streaming Auto Loader from S3/ADLS


from pyspark.sql.functions import (
    col, current_timestamp, input_file_name,
    from_json, schema_of_json, lit
)
from pyspark.sql.types import StructType
import uuid
from datetime import datetime

# Configuration
source_path = "s3://my-data-bucket/events/"
checkpoint_path = "/mnt/delta/checkpoints/bronze_events"
target_table = "main.raw.bronze_events"
batch_id = str(uuid.uuid4())

# Initialize Spark session with Delta Lake
spark.sql("CREATE CATALOG IF NOT EXISTS main")
spark.sql("CREATE SCHEMA IF NOT EXISTS main.raw")

# Auto Loader with streaming (continuous ingestion)
def ingest_events_streaming():
    """
    Read events from S3 using Auto Loader with schema evolution.
    """
    df = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "/mnt/delta/schema_hints/events")
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.rescuedDataColumn", "_rescue_data")
        .option("pathGlobfilter", "*.json")
        .option("ignoreChanges", "true")
        .option("maxFilesPerTrigger", 100)
        .load(source_path)
    )

    # Add Bronze metadata columns
    enriched_df = (
        df
        .withColumn("_ingestion_timestamp", current_timestamp())
        .withColumn("_source_file", input_file_name())
        .withColumn("_batch_id", lit(batch_id))
        .withColumn("_ingestion_date",
                   col("_ingestion_timestamp").cast("date"))
    )

    # Write to Delta Bronze table with checkpointing
    (
        enriched_df
        .writeStream
        .format("delta")
        .mode("append")
        .option("checkpointLocation", checkpoint_path)
        .option("mergeSchema", "true")
        .partitionBy("_ingestion_date")
        .table(target_table)
        .start()
        .awaitTermination()
    )

if __name__ == "__main__":
    ingest_events_streaming()

Trigger-Once Pattern for Scheduled Ingestion

For scheduled batch jobs, use trigger-once semantics with Databricks Jobs:


from pyspark.sql.functions import (
    col, current_timestamp, input_file_name, lit, to_date
)
import uuid
from datetime import datetime

# Configuration
source_path = "s3://my-data-bucket/events/"
checkpoint_path = "/mnt/delta/checkpoints/bronze_events_batch"
target_table = "main.raw.bronze_events"
batch_id = str(uuid.uuid4())

def ingest_events_batch(trigger_interval_minutes: int = 5):
    """
    Trigger-once batch ingestion with Auto Loader and schema evolution.
    Designed to run on a 5-minute schedule via Databricks Jobs.
    """
    try:
        spark.sql("CREATE CATALOG IF NOT EXISTS main")
        spark.sql("CREATE SCHEMA IF NOT EXISTS main.raw")

        df = (
            spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.schemaLocation", "/mnt/delta/schema_hints/events")
            .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
            .option("cloudFiles.rescuedDataColumn", "_rescue_data")
            .option("maxFilesPerTrigger", 500)
            .option("maxBytes", "1gb")
            .load(source_path)
        )

        enriched_df = (
            df
            .withColumn("_ingestion_timestamp", current_timestamp())
            .withColumn("_source_file", input_file_name())
            .withColumn("_batch_id", lit(batch_id))
            .withColumn("_ingestion_date",
                       to_date(col("_ingestion_timestamp")))
        )

        # Trigger-once: runs once and stops
        query = (
            enriched_df
            .writeStream
            .format("delta")
            .mode("append")
            .option("checkpointLocation", checkpoint_path)
            .option("mergeSchema", "true")
            .partitionBy("_ingestion_date")
            .table(target_table)
            .trigger(once=True)  # Critical: trigger once for batch semantics
            .start()
        )

        query.awaitTermination()
        print(f"Batch {batch_id} completed successfully.")

    except Exception as e:
        print(f"Ingestion failed: {str(e)}")
        raise

if __name__ == "__main__":
    ingest_events_batch()

Schema Hints for Auto Loader

Auto Loader can accept pre-defined schema hints to accelerate initial schema detection and ensure minimal schema expectations:


from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

# Define schema hint as StructType
events_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("user_id", LongType(), True),
    StructField("event_type", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("properties", StringType(), True),  # JSON as string, parsed later
])

# Write schema hint to DBFS for Auto Loader discovery
schema_hint_path = "/mnt/delta/schema_hints/events/schema.json"
spark.createDataFrame([], events_schema).write.option("mergeSchema", "false").json(schema_hint_path)

# Auto Loader will now use this as a baseline and add new columns as they appear

4. Batch Ingestion

For non-streaming sources (periodic exports, SFTP drops, FTP servers), batch ingestion leverages direct read operations with error handling.

CSV Ingestion with Error Handling


from pyspark.sql.functions import (
    col, current_timestamp, input_file_name, lit, to_date,
    from_json, schema_of_json
)
from pyspark.sql.types import StringType
import uuid

batch_id = str(uuid.uuid4())
source_path = "s3://my-data-bucket/batch/customer_data/*.csv"
bad_records_path = f"/mnt/delta/quarantine/customer_data/{batch_id}"
target_table = "main.raw.bronze_customers"

def ingest_csv_batch():
    """
    Ingest CSV with permissive mode (captures malformed records).
    """
    try:
        df = (
            spark.read
            .format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("mode", "PERMISSIVE")
            .option("badRecordsPath", bad_records_path)
            .option("multiLine", "true")
            .option("escape", '"')
            .load(source_path)
        )

        # Remove internal Spark columns from bad records
        df = df.select([c for c in df.columns if not c.startswith("_corrupt")])

        # Add Bronze metadata
        enriched_df = (
            df
            .withColumn("_ingestion_timestamp", current_timestamp())
            .withColumn("_source_file", input_file_name())
            .withColumn("_batch_id", lit(batch_id))
            .withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))
        )

        # Write to Bronze
        enriched_df.write \
            .format("delta") \
            .mode("append") \
            .partitionBy("_ingestion_date") \
            .saveAsTable(target_table)

        # Log bad records count
        bad_df = spark.read.json(bad_records_path)
        bad_count = bad_df.count()
        print(f"Ingested {enriched_df.count()} good records, {bad_count} malformed.")

    except Exception as e:
        print(f"CSV ingestion failed: {str(e)}")
        raise

if __name__ == "__main__":
    ingest_csv_batch()

JSON Lines Ingestion


from pyspark.sql.functions import col, current_timestamp, input_file_name, lit, to_date
import uuid

batch_id = str(uuid.uuid4())
source_path = "s3://my-data-bucket/batch/logs/*.jsonl"
target_table = "main.raw.bronze_logs"

def ingest_jsonl_batch():
    """
    Ingest newline-delimited JSON with schema evolution.
    """
    try:
        df = (
            spark.read
            .format("json")
            .option("inferSchema", "true")
            .option("multiLine", "false")
            .load(source_path)
        )

        enriched_df = (
            df
            .withColumn("_ingestion_timestamp", current_timestamp())
            .withColumn("_source_file", input_file_name())
            .withColumn("_batch_id", lit(batch_id))
            .withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))
        )

        enriched_df.write \
            .format("delta") \
            .mode("append") \
            .partitionBy("_ingestion_date") \
            .mergeSchema(True) \
            .saveAsTable(target_table)

        print(f"Ingested {enriched_df.count()} JSON records.")

    except Exception as e:
        print(f"JSON ingestion failed: {str(e)}")
        raise

if __name__ == "__main__":
    ingest_jsonl_batch()

SQL COPY INTO for Efficient Batch Loading

For high-volume batch ingestion, the SQL COPY INTO command provides efficient file processing with incremental tracking:


-- Create Bronze table if not exists
CREATE TABLE IF NOT EXISTS main.raw.bronze_orders (
  order_id STRING,
  customer_id LONG,
  amount DECIMAL(12, 2),
  order_date STRING,
  _ingestion_timestamp TIMESTAMP,
  _source_file STRING,
  _batch_id STRING,
  _ingestion_date DATE
)
PARTITIONED BY (_ingestion_date);

-- COPY INTO with automatic file tracking (no duplicates on reruns)
COPY INTO main.raw.bronze_orders
FROM (
  SELECT
    _1 as order_id,
    _2 as customer_id,
    _3 as amount,
    _4 as order_date,
    current_timestamp() as _ingestion_timestamp,
    input_file_name() as _source_file,
    '2f5a8c3b-1234-5678-9012' as _batch_id,
    to_date(current_timestamp()) as _ingestion_date
  FROM 's3://my-data-bucket/batch/orders/*.csv'
)
FILEFORMAT = CSV
FORMAT_OPTIONS (
  'header' = 'true',
  'inferSchema' = 'true',
  'nullValue' = '',
  'mode' = 'PERMISSIVE'
)
COPY_OPTIONS (
  'mergeSchema' = 'true'
);

5. Streaming Ingestion from Kafka

Kafka integration enables real-time Bronze layer population from message brokers with exactly-once semantics.

Kafka Structured Streaming to Delta


from pyspark.sql.functions import (
    col, from_json, current_timestamp, lit, to_date,
    from_timestamp, cast, StringType
)
from pyspark.sql.types import StructType, StructField, StringType as ST, LongType
import json
import uuid

# Configuration
kafka_bootstrap = "kafka-broker-1.example.com:9092,kafka-broker-2.example.com:9092"
kafka_topic = "events"
checkpoint_path = "/mnt/delta/checkpoints/bronze_kafka_events"
target_table = "main.raw.bronze_kafka_events"
batch_id = str(uuid.uuid4())

# Kafka message schema
kafka_message_schema = StructType([
    StructField("event_id", ST(), True),
    StructField("user_id", LongType(), True),
    StructField("event_type", ST(), True),
    StructField("timestamp", ST(), True),
    StructField("properties", ST(), True),
])

def ingest_kafka_streaming():
    """
    Read from Kafka topic, parse JSON payloads, write to Delta Bronze.
    Includes offset tracking for exactly-once semantics.
    """
    try:
        # Read from Kafka
        df_kafka = (
            spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", kafka_bootstrap)
            .option("subscribe", kafka_topic)
            .option("startingOffsets", "latest")
            .option("maxOffsetsPerTrigger", 10000)
            .option("kafka.group.id", f"bronze-{batch_id}")
            .load()
        )

        # Parse Kafka value (JSON) and add metadata
        df_parsed = (
            df_kafka
            .select(
                col("key").cast(ST()).alias("partition_key"),
                col("offset").alias("_kafka_offset"),
                col("partition").alias("_kafka_partition"),
                col("timestamp").alias("_kafka_timestamp"),
                from_json(col("value").cast(ST()), kafka_message_schema).alias("payload")
            )
            .select(
                col("partition_key"),
                col("_kafka_offset"),
                col("_kafka_partition"),
                col("_kafka_timestamp"),
                col("payload.*")
            )
            .withColumn("_ingestion_timestamp", current_timestamp())
            .withColumn("_source_file", lit(f"kafka://{kafka_topic}"))
            .withColumn("_batch_id", lit(batch_id))
            .withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))
        )

        # Write to Delta with checkpointing
        query = (
            df_parsed
            .writeStream
            .format("delta")
            .mode("append")
            .option("checkpointLocation", checkpoint_path)
            .option("mergeSchema", "true")
            .partitionBy("_ingestion_date")
            .table(target_table)
            .trigger(processingTime="1 minute")
            .start()
        )

        query.awaitTermination()

    except Exception as e:
        print(f"Kafka ingestion failed: {str(e)}")
        raise

if __name__ == "__main__":
    ingest_kafka_streaming()

6. Schema Management

Bronze layers handle schema evolution gracefully to accommodate source system changes without breaking pipelines.

Schema Evolution Strategies

Add New Columns Mode: Auto Loader automatically adds new columns found in arriving data as nullable columns.


# Auto Loader with schema evolution
df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .option("cloudFiles.schemaLocation", "/mnt/delta/schema_hints/events")
    .load("s3://my-bucket/events/")
)

# Writing with mergeSchema=true allows schema evolution at write time
df.writeStream \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", "/mnt/checkpoints/events") \
    .table("main.raw.bronze_events") \
    .start()

Rescue Column for Unmatched Fields

When enabled, Auto Loader captures fields that don't match the detected schema in a _rescue_data column:


# Enable rescue column to capture unexpected fields
df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.rescuedDataColumn", "_rescue_data")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load("s3://my-bucket/events/")
)

# _rescue_data is a JSON string containing unmatched fields
# Later processing can parse it: from_json(col("_rescue_data"), ...)

Schema Drift Detection and Logging


from pyspark.sql.functions import col, from_json, size, json_tuple
from pyspark.sql.types import StructType, StructField, StringType

def detect_schema_drift():
    """
    Monitor incoming data for unexpected fields (schema drift).
    Log drift events for downstream awareness.
    """
    # Read Bronze table with rescue data
    df = spark.read.table("main.raw.bronze_events")

    # Filter to records with rescue data (unexpected fields)
    drift_df = df.filter(col("_rescue_data").isNotNull())

    # Extract field names from rescue data
    drift_summary = (
        drift_df
        .select(col("_rescue_data"), col("_ingestion_timestamp"))
        .groupBy(col("_ingestion_timestamp").cast("date"))
        .count()
        .orderBy("_ingestion_timestamp", ascending=False)
    )

    print(f"Schema drift detected in {drift_df.count()} records.")
    drift_summary.show()

    # Persist drift summary to monitoring table
    drift_summary.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("main.monitoring.schema_drift_events")

if __name__ == "__main__":
    detect_schema_drift()

7. Partitioning & File Management

Effective partitioning and file optimization ensure Bronze layer scalability and query performance.

Partitioning Strategy

Partition Bronze tables by _ingestion_date (not year/month/day hierarchy) to balance partition sizes and enable efficient date-range queries:


-- Create Bronze table with single-level date partitioning
CREATE TABLE IF NOT EXISTS main.raw.bronze_events (
  event_id STRING,
  user_id LONG,
  event_type STRING,
  payload STRING,
  _ingestion_timestamp TIMESTAMP,
  _source_file STRING,
  _batch_id STRING,
  _ingestion_date DATE
)
USING DELTA
PARTITIONED BY (_ingestion_date);

-- Query with partition pruning
SELECT * FROM main.raw.bronze_events
WHERE _ingestion_date >= '2026-04-01' AND _ingestion_date < '2026-05-01';

File Size Optimization

Maintain target file sizes of 128MB–1GB per file to optimize S3 list operations and Spark task distribution:


from pyspark.sql.functions import col, current_timestamp, lit, to_date
import uuid

batch_id = str(uuid.uuid4())

def optimize_bronze_files():
    """
    Rewrite Bronze table files to optimal size using OPTIMIZE.
    """
    # Read and repartition to consolidate small files
    df = spark.read.table("main.raw.bronze_events")

    # Target ~100 partitions for distributed write
    optimized_df = df.repartition(100, col("_ingestion_date"))

    # Write with coalesce to control file sizes
    optimized_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("dataChange", "false") \
        .partitionBy("_ingestion_date") \
        .saveAsTable("main.raw.bronze_events")

    # Run OPTIMIZE to compact files
    spark.sql("""
        OPTIMIZE main.raw.bronze_events
        WHERE _ingestion_date >= current_date() - 7
        ZORDER BY user_id
    """)

    print("Bronze files optimized.")

if __name__ == "__main__":
    optimize_bronze_files()

OPTIMIZE and VACUUM

Periodically compact small files and remove old versions:


-- Compact files in recent partitions (improves query performance)
OPTIMIZE main.raw.bronze_events
WHERE _ingestion_date >= current_date() - 7
ZORDER BY _source_file;

-- Remove old versions to save storage (30 days by default)
VACUUM main.raw.bronze_events RETAIN 30 DAYS;

-- Check table statistics
DESCRIBE EXTENDED main.raw.bronze_events;

8. Data Quality at Bronze

Implement quality checks to quarantine malformed records and track data quality metrics.

Quarantine Pattern for Malformed Records


from pyspark.sql.functions import (
    col, current_timestamp, input_file_name, lit, to_date,
    when, isnan, isnull
)
import uuid

batch_id = str(uuid.uuid4())
source_path = "s3://my-data-bucket/events/*.json"
target_table = "main.raw.bronze_events"
quarantine_table = "main.raw.quarantine_events"

def ingest_with_quarantine():
    """
    Ingest events with validation and quarantine pattern.
    Valid records go to Bronze; invalid ones go to Quarantine.
    """
    try:
        # Read raw data
        df = (
            spark.read
            .format("json")
            .option("inferSchema", "true")
            .load(source_path)
        )

        # Add metadata
        df = (
            df
            .withColumn("_ingestion_timestamp", current_timestamp())
            .withColumn("_source_file", input_file_name())
            .withColumn("_batch_id", lit(batch_id))
            .withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))
        )

        # Quality checks
        is_valid = (
            (col("event_id").isNotNull()) &
            (col("user_id").isNotNull()) &
            (col("event_type").isNotNull()) &
            (~isnan(col("amount")))  # For numeric fields
        )

        # Split into valid and quarantine
        valid_df = df.filter(is_valid)
        quarantine_df = df.filter(~is_valid).withColumn(
            "_quarantine_reason",
            when(col("event_id").isNull(), "null_event_id")
            .when(col("user_id").isNull(), "null_user_id")
            .when(col("event_type").isNull(), "null_event_type")
            .otherwise("invalid_amount")
        )

        # Write valid records to Bronze
        valid_df.write \
            .format("delta") \
            .mode("append") \
            .partitionBy("_ingestion_date") \
            .saveAsTable(target_table)

        # Write invalid records to Quarantine for investigation
        quarantine_df.write \
            .format("delta") \
            .mode("append") \
            .partitionBy("_ingestion_date") \
            .saveAsTable(quarantine_table)

        print(f"Processed: {valid_df.count()} valid, {quarantine_df.count()} quarantined.")

    except Exception as e:
        print(f"Ingestion with quarantine failed: {str(e)}")
        raise

if __name__ == "__main__":
    ingest_with_quarantine()

Data Quality Expectations with Delta Live Tables

For continuous data quality monitoring, use DLT expectations:


import dlt
from pyspark.sql.functions import col

@dlt.table(
    name="bronze_events",
    comment="Raw events from Kafka with quality expectations",
    partition_cols=["_ingestion_date"]
)
@dlt.expect("valid_event_id", "event_id IS NOT NULL")
@dlt.expect("valid_user_id", "user_id > 0")
@dlt.expect("valid_timestamp", "timestamp IS NOT NULL")
@dlt.expect("reasonable_amount", "amount >= 0 AND amount <= 1000000")
def bronze_events():
    """
    Read from Kafka and apply quality expectations.
    DLT automatically tracks pass/fail metrics.
    """
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "events")
        .option("startingOffsets", "latest")
        .load()
        .select(
            col("value").cast("string").alias("raw_payload"),
            col("timestamp").alias("_kafka_timestamp")
        )
    )

@dlt.table(
    name="quarantine_events",
    comment="Records that failed quality expectations"
)
def quarantine_events():
    """
    Capture records that violated expectations for investigation.
    """
    return dlt.read("bronze_events_quarantine")  # DLT auto-creates quarantine table

9. Incremental Processing

Handle incremental updates and late-arriving data with watermarks and merge patterns.

Watermarking for Late-Arriving Data


from pyspark.sql.functions import (
    col, current_timestamp, from_json, window, lit
)
from pyspark.sql.types import StructType, StructField, StringType, LongType

# Define message schema
schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("user_id", LongType(), True),
    StructField("timestamp", StringType(), True),
])

def process_with_watermark():
    """
    Process Kafka stream with watermark to handle late-arriving data.
    Watermark delays processing of data by 10 minutes to allow late arrivals.
    """
    df = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "events")
        .option("startingOffsets", "latest")
        .load()
    )

    # Parse and add watermark
    df_parsed = (
        df
        .select(from_json(col("value").cast("string"), schema).alias("payload"))
        .select("payload.*")
        .select(
            col("event_id"),
            col("user_id"),
            col("timestamp").cast("timestamp").alias("event_time")
        )
        # Watermark: allow 10 minutes of late data before dropping
        .withWatermark("event_time", "10 minutes")
    )

    # Aggregate with windowing
    windowed_df = (
        df_parsed
        .groupBy(
            window(col("event_time"), "1 hour", "30 minutes"),
            col("user_id")
        )
        .count()
        .select(
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("user_id"),
            col("count").alias("event_count"),
            current_timestamp().alias("_ingestion_timestamp")
        )
    )

    # Write to Bronze with trigger
    query = (
        windowed_df
        .writeStream
        .format("delta")
        .mode("append")
        .option("checkpointLocation", "/mnt/delta/checkpoints/windowed_events")
        .table("main.raw.bronze_windowed_events")
        .trigger(processingTime="5 minutes")
        .start()
    )

    query.awaitTermination()

if __name__ == "__main__":
    process_with_watermark()

Merge Pattern for Upserts at Bronze

For sources requiring idempotent upserts (deduplication), use Delta merge:


from pyspark.sql.functions import col, current_timestamp, lit
import uuid

batch_id = str(uuid.uuid4())

def merge_with_deduplication():
    """
    Merge new events into Bronze table with deduplication.
    If event_id already exists, update only if new record is newer.
    """
    # Load new data
    new_events = spark.read.json("s3://my-bucket/new_events.json")

    # Add metadata
    new_events = (
        new_events
        .withColumn("_ingestion_timestamp", current_timestamp())
        .withColumn("_batch_id", lit(batch_id))
    )

    # Merge into target table
    spark.sql("""
        MERGE INTO main.raw.bronze_events AS target
        USING (
            SELECT
                event_id,
                user_id,
                event_type,
                timestamp,
                _ingestion_timestamp,
                _batch_id
            FROM new_events
        ) AS source
        ON target.event_id = source.event_id
        WHEN MATCHED AND source._ingestion_timestamp > target._ingestion_timestamp THEN
            UPDATE SET
                user_id = source.user_id,
                event_type = source.event_type,
                timestamp = source.timestamp,
                _ingestion_timestamp = source._ingestion_timestamp,
                _batch_id = source._batch_id
        WHEN NOT MATCHED THEN
            INSERT (event_id, user_id, event_type, timestamp, _ingestion_timestamp, _batch_id)
            VALUES (source.event_id, source.user_id, source.event_type, source.timestamp,
                    source._ingestion_timestamp, source._batch_id)
    """)

    print("Merge completed with deduplication.")

if __name__ == "__main__":
    merge_with_deduplication()

10. Production Patterns

Production Bronze layers employ multi-source architectures with source-specific tables and comprehensive monitoring.

Multi-Source Bronze Architecture

Create separate Bronze tables for each data source to maintain independence and enable source-specific monitoring:


from pyspark.sql.functions import (
    col, current_timestamp, input_file_name, lit, to_date
)
from datetime import datetime
import uuid

def setup_multi_source_bronze():
    """
    Create Bronze layer with source-specific tables following naming convention.
    """
    spark.sql("CREATE SCHEMA IF NOT EXISTS main.raw")

    sources = [
        {
            "name": "sales_transactions",
            "path": "s3://data-bucket/sources/salesforce/transactions/",
            "format": "json",
            "system": "salesforce"
        },
        {
            "name": "customer_events",
            "path": "s3://data-bucket/sources/kafka/events/",
            "format": "json",
            "system": "kafka"
        },
        {
            "name": "erp_orders",
            "path": "s3://data-bucket/sources/sap/orders/",
            "format": "csv",
            "system": "sap"
        },
    ]

    for source in sources:
        batch_id = str(uuid.uuid4())
        table_name = f"main.raw.bronze_{source['name']}"
        checkpoint_path = f"/mnt/delta/checkpoints/bronze_{source['name']}"

        print(f"Setting up Bronze table: {table_name}")

        try:
            # Read source
            df = (
                spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", source["format"])
                .option("cloudFiles.schemaLocation",
                       f"/mnt/delta/schema_hints/{source['name']}")
                .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
                .option("cloudFiles.rescuedDataColumn", "_rescue_data")
                .load(source["path"])
            )

            # Add source-specific metadata
            enriched_df = (
                df
                .withColumn("_ingestion_timestamp", current_timestamp())
                .withColumn("_source_file", input_file_name())
                .withColumn("_source_system", lit(source["system"]))
                .withColumn("_batch_id", lit(batch_id))
                .withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))
            )

            # Write to Bronze
            (
                enriched_df
                .writeStream
                .format("delta")
                .mode("append")
                .option("checkpointLocation", checkpoint_path)
                .option("mergeSchema", "true")
                .partitionBy("_ingestion_date")
                .table(table_name)
                .trigger(once=True)
                .start()
                .awaitTermination()
            )

        except Exception as e:
            print(f"Failed to setup {table_name}: {str(e)}")
            raise

if __name__ == "__main__":
    setup_multi_source_bronze()

Idempotent Ingestion with Source Tracking


from pyspark.sql.functions import col, md5, concat_ws, current_timestamp, lit
import hashlib

def idempotent_ingest():
    """
    Idempotent ingestion: compute content hash to detect duplicates
    and skip re-ingesting identical data on retry.
    """
    # Load new data
    new_data = spark.read.json("s3://bucket/source_data.json")

    # Create content hash for idempotency detection
    new_data = (
        new_data
        .withColumn(
            "_content_hash",
            md5(concat_ws("||", *[col(c) for c in new_data.columns]))
        )
        .withColumn("_ingestion_timestamp", current_timestamp())
    )

    # Check if records already exist (by content hash)
    existing_hashes = (
        spark.read.table("main.raw.bronze_events")
        .select("_content_hash")
        .distinct()
    )

    # Anti-join to get only new records
    new_records = (
        new_data
        .join(existing_hashes, "_content_hash", "leftanti")
    )

    # Write only new records
    new_records.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("main.raw.bronze_events")

    print(f"Ingested {new_records.count()} new records (idempotent).")

if __name__ == "__main__":
    idempotent_ingest()

Monitoring and Alerting on Ingestion Lag


from pyspark.sql.functions import (
    col, max as spark_max, min as spark_min, current_timestamp,
    (col("_ingestion_timestamp") - col("event_timestamp")).alias("lag_seconds")
)

def monitor_ingestion_lag():
    """
    Monitor ingestion lag (delay between event occurrence and Bronze arrival).
    Write metrics to monitoring table for alerting.
    """
    # Read recent Bronze records
    df = spark.read.table("main.raw.bronze_events")

    # Compute lag statistics
    lag_metrics = (
        df
        .filter(col("_ingestion_date") >= current_date() - 1)
        .select(
            col("_source_file"),
            ((col("_ingestion_timestamp") - col("timestamp")) / 60).alias("lag_minutes")
        )
        .groupBy("_source_file")
        .agg(
            spark_max("lag_minutes").alias("max_lag_minutes"),
            spark_min("lag_minutes").alias("min_lag_minutes"),
            (spark_max("lag_minutes") - spark_min("lag_minutes")).alias("lag_range_minutes")
        )
        .withColumn("check_timestamp", current_timestamp())
    )

    # Persist metrics
    lag_metrics.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("main.monitoring.bronze_ingestion_lag")

    # Alert if lag exceeds threshold
    high_lag = lag_metrics.filter(col("max_lag_minutes") > 60)
    if high_lag.count() > 0:
        print("WARNING: High ingestion lag detected!")
        high_lag.show()

if __name__ == "__main__":
    monitor_ingestion_lag()

Naming Conventions and Catalog Organization


-- Catalog structure for multi-source Bronze layer
CREATE CATALOG IF NOT EXISTS main;
CREATE SCHEMA IF NOT EXISTS main.raw;

-- Source-specific tables follow pattern: bronze_{source}_{logical_entity}
CREATE TABLE IF NOT EXISTS main.raw.bronze_salesforce_accounts (...);
CREATE TABLE IF NOT EXISTS main.raw.bronze_salesforce_opportunities (...);
CREATE TABLE IF NOT EXISTS main.raw.bronze_kafka_events (...);
CREATE TABLE IF NOT EXISTS main.raw.bronze_sap_purchase_orders (...);
CREATE TABLE IF NOT EXISTS main.raw.bronze_stripe_transactions (...);

-- Quarantine and monitoring schemas
CREATE SCHEMA IF NOT EXISTS main.quarantine;
CREATE SCHEMA IF NOT EXISTS main.monitoring;

-- List all Bronze tables
SELECT table_name, table_type, location
FROM main.information_schema.tables
WHERE table_schema = 'raw' AND table_name LIKE 'bronze_%';

Conclusion

The Bronze layer serves as the foundation of the Medallion architecture, prioritizing raw data preservation, schema flexibility, and audit trails. By following the design principles of append-only ingestion, minimal transformation, and comprehensive metadata enrichment, Bronze layers enable trustworthy, replayable data pipelines that scale to petabyte volumes.

Key takeaways: