Real-Time Embedding Pipeline: 1M Docs/Day, Sub-Minute Freshness

The brief: build a pipeline that re-embeds 1 million documents per day with sub-minute freshness from source-of-truth update to vector index. Source is a Postgres OLTP database (orders, support tickets, knowledge base articles) with a typical write rate of 20 updates/sec, bursting to 200 during business hours. Downstream consumers are RAG and search systems that must see fresh embeddings within 60 seconds of an upstream write.

Embedding pipelines fail in two predictable ways: (1) they fall behind during bursts and never catch up, or (2) they cost more in GPU spend than the product generates in revenue. Both are solved by treating the pipeline as a streaming system with backpressure and a cost-quality knob, not a batch job that runs every 5 minutes.



1. Problem & Functional Requirements


2. Non-Functional Requirements & SLOs

Metric Target
End-to-end p50 lag (DB commit to searchable)15 s
End-to-end p95 lag60 s
End-to-end p99 lag5 min (degraded mode acceptable)
Sustained throughput1M docs/day = ~12 docs/sec avg
Burst tolerance10× sustained for 15 min without lag breach
Data loss toleranceZero (every commit must reach the vector index or DLQ)
Embedding cost ceiling< $400/day at sustained rate

3. Capacity Estimates

Chunking and embedding count. 1M docs/day × ~5 chunks/doc average (most docs are short, some are long) ≈ 5M embeddings/day ≈ 58 embeddings/sec sustained, ~580/sec at 10× burst.

GPU sizing for embedding workers. bge-large-en-v1.5 served by vLLM on a single L4 GPU at batch 256 yields ~3,000 chunks/sec. We need ~58/sec sustained, ~580/sec burst. One L4 handles steady state with 50× headroom. Two L4s for burst plus one warm spare = 3 L4s in the pool.

Kafka throughput. 12 doc-events/sec sustained at ~1 KB per event = 12 KB/sec. Trivial; a 3-broker Kafka cluster sized for the rest of the platform absorbs this without a dedicated topic shard. Burst is 120 events/sec ≈ 120 KB/sec, still trivial.

Vector store write throughput. 580 upserts/sec at burst into pgvector with HNSW: each insert triggers an HNSW link insertion. Measured at ~2k inserts/sec on db.r6i.4xlarge for 1024-dim vectors; well within budget. The pgvector HNSW build cost is the bottleneck if you go above ~5k inserts/sec sustained — at that point switch to Qdrant or batch builds.

End-to-end lag budget (p95 target 60 s):

postgres_commit:                 0 s   # baseline
debezium_capture_to_kafka:       2 s   # WAL polling + serialization
kafka_to_consumer:               1 s
chunking + dedupe:               2 s
batch_wait_for_embedder:        10 s   # max batch fill timeout
embedding_inference:             3 s
vector_store_upsert:             5 s
hnsw_index_visibility:          12 s   # pgvector immediate, but readers may pin
buffer_for_jitter:              25 s
---
total p95:                      60 s

4. High-Level Architecture

  +-----------------+
  |  Postgres OLTP  |  (source of truth)
  +--------+--------+
           |
           | logical replication slot
           v
  +-----------------+
  |   Debezium      |  (CDC connector, reads WAL)
  +--------+--------+
           |
           | one Kafka event per row change
           v
  +-----------------+      +------------------+
  |  Kafka topic    |---->|  Schema Registry |
  |  doc-changes    |      +------------------+
  +--------+--------+
           |
           v
  +-----------------+
  |  Chunker        |  (consumer group, 4 partitions)
  |  (Python/Go)    |  - normalize, split, dedupe by content_hash
  +--------+--------+
           |
           | chunk-tasks topic (partition by doc_id)
           v
  +-----------------+      +-------------------+
  |  Embed Worker   |<---->|  vLLM / Bedrock   |
  |  Pool (3 L4s)   |      |  embedding endpt  |
  +--------+--------+      +-------------------+
           |
           | upsert batches
           v
  +-----------------+
  |  Vector Store   |  (pgvector / Qdrant / Pinecone)
  +--------+--------+
           |
           v
  +-----------------+
  |  RAG / Search   |  (downstream consumers)
  +-----------------+

  Out-of-band:
  +-----------------+      +-------------------+
  |  DLQ Topic      |---->|  Operator UI      |  manual replay
  +-----------------+      +-------------------+

  +-----------------+
  |  Lag Exporter   |  publishes end-to-end lag to Prometheus
  +-----------------+

Component responsibilities:


5. Data Model & Idempotency

The pipeline must be idempotent on every step. Re-delivery from Kafka, re-processing after a worker crash, replay from DLQ — all must produce the same final state in the vector store.

-- Source table (existing OLTP)
CREATE TABLE articles (
    article_id   BIGINT PRIMARY KEY,
    title        TEXT,
    body         TEXT,
    updated_at   TIMESTAMPTZ DEFAULT now(),
    deleted_at   TIMESTAMPTZ
);

-- Vector store (pgvector, same DB or a replica)
CREATE TABLE article_chunks (
    chunk_id      UUID PRIMARY KEY,
    article_id    BIGINT NOT NULL,
    chunk_index   INT NOT NULL,
    content_hash  BYTEA NOT NULL,        -- sha256(chunk_text); skip embed if unchanged
    text          TEXT NOT NULL,
    embedding     VECTOR(1024),
    embed_model   TEXT NOT NULL,
    embed_version SMALLINT NOT NULL,
    source_lsn    PG_LSN NOT NULL,       -- watermark from CDC for idempotency + lag tracking
    updated_at    TIMESTAMPTZ DEFAULT now(),
    UNIQUE (article_id, chunk_index)
);

CREATE INDEX article_chunks_hnsw ON article_chunks
    USING hnsw (embedding vector_cosine_ops) WITH (m = 32, ef_construction = 200);
CREATE INDEX article_chunks_article ON article_chunks (article_id);
CREATE INDEX article_chunks_lsn ON article_chunks (source_lsn);

Two-layer idempotency. First, the Kafka consumer commits offsets after a successful upsert, so crashes replay from the last durable offset. Second, the upsert uses ON CONFLICT (article_id, chunk_index) DO UPDATE WHERE EXCLUDED.source_lsn > article_chunks.source_lsn; older replays cannot overwrite newer state, even out of order.

Content-hash short-circuit. Before embedding, compare sha256(chunk_text) against the stored hash. If unchanged (e.g. the row was touched but the text didn't change), skip the embedding call entirely. In practice this filters 30–60% of CDC events, cutting GPU spend by the same fraction.


6. Critical Path: Update Propagation

async def chunker_worker(consumer: AIOKafkaConsumer):
    async for batch in consumer.batches():
        chunks_to_embed = []
        for msg in batch:
            event = DocChangeEvent.parse(msg.value)
            if event.op == "delete":
                await tombstone(event.article_id, event.lsn)
                continue

            chunks = chunk_text(event.after.body, target_tokens=512, overlap=64)
            for i, text in enumerate(chunks):
                h = sha256(text.encode()).digest()
                # Short-circuit: skip unchanged chunks
                if await current_hash_equals(event.article_id, i, h):
                    continue
                chunks_to_embed.append(ChunkTask(
                    chunk_id=deterministic_uuid(event.article_id, i),
                    article_id=event.article_id,
                    chunk_index=i,
                    text=text,
                    content_hash=h,
                    source_lsn=event.lsn,
                ))

        if chunks_to_embed:
            await chunk_tasks_producer.send_batch(chunks_to_embed)
        await consumer.commit()  # only after enqueue succeeds


async def embed_worker(consumer: AIOKafkaConsumer):
    batcher = MicroBatcher(max_size=256, max_wait_ms=50)
    async for msg in consumer:
        task = ChunkTask.parse(msg.value)
        await batcher.add(task)
        if batcher.ready():
            batch = await batcher.flush()
            vectors = await vllm_client.embed([t.text for t in batch])
            await pg.upsert_chunks([
                ChunkRow(
                    chunk_id=t.chunk_id,
                    article_id=t.article_id,
                    chunk_index=t.chunk_index,
                    content_hash=t.content_hash,
                    text=t.text,
                    embedding=v,
                    embed_model="bge-large-en-v1.5",
                    embed_version=2,
                    source_lsn=t.source_lsn,
                ) for t, v in zip(batch, vectors)
            ])
            await consumer.commit()

Deterministic chunk IDs. deterministic_uuid(article_id, i) produces the same UUID for the same (article, chunk_index) pair, so re-delivery becomes a natural upsert without orphan rows. Chunk count changes (a shorter article producing fewer chunks) are handled by a tail-deletion step that removes chunk_index >= new_chunk_count.


7. Scaling, Backpressure & Quality Knob

Backpressure. Kafka's consumer-group lag is the source of truth. The lag exporter compares the latest consumed offset to the latest produced offset; when lag exceeds 1 min, the autoscaler adds embed workers. When the embed worker pool itself is saturated (queue depth on the vLLM endpoint > 100), the chunker pauses consumption (drops Kafka poll() rate) rather than building a backlog inside the pipeline.

Cost-quality knob. Three operating modes, switched by a single config flag based on lag:

Mode Trigger Embed model Batch Throughput
QUALITY lag < 30 s bge-large (1024d) 256 3,000 chunks/sec
BALANCED 30 s ≤ lag < 5 min bge-base (768d) 512 8,000 chunks/sec
SHED lag ≥ 5 min bge-base (768d) 1024 15,000 chunks/sec; downsample updates < 1h old by 50%

SHED mode embeds a sampled subset of recent updates and queues the rest for backfill once lag recovers. The choice of which updates to drop is policy: "low-priority tenants first," "update events only (not creates)," etc. The key is that the system never silently drops — deferred events are queued in a backfill topic, not lost.

Bottlenecks at scale, in order:

  1. HNSW insert cost in pgvector saturates around 5k inserts/sec. Above that, switch to Qdrant or batch index rebuilds.
  2. Kafka rebalance pauses on consumer group changes can stall the pipeline for 10–30 s. Use static membership and incremental cooperative rebalancing.
  3. Postgres replication slot bloat when Debezium is slow or paused. Monitor pg_replication_slots.confirmed_flush_lsn; alert if WAL retention exceeds 10 GB.
  4. vLLM batch fill latency on low-traffic periods: max_wait_ms=50 means a single isolated event waits 50 ms before embedding. Below ~10 docs/sec, the wait dominates inference latency. Acceptable: this is sub-second by construction.

8. Failure Modes & DLQs

DLQ replay protocol. The operator UI shows DLQ messages grouped by error fingerprint; once a fix is deployed, "replay" emits the messages back to chunk-tasks with a header indicating retry. Replays go through the same idempotency path; the worst case is wasted GPU on a re-embed that produces the same vector.


9. Cost Analysis

Per 1M source events, with 50% short-circuited by content-hash dedupe (so 500k actually embedded):

debezium_msk:                 $1.00   # amortized small Kafka cluster
chunker_workers:              $0.50   # 2 small instances
embed_workers_l4_gpu:         $4.80   # 3 L4s @ $0.80/hr / 1M events
  # alternative: bedrock_titan: ~$50.00 (10x more for managed)
vector_store_writes:          $0.30   # pgvector amortized
storage_growth:               $0.10   # ~5GB/day at 1M new chunks
monitoring + s3 dlq:          $0.20
---
total_per_1M_events:          $6.90
sustained_daily_cost:         $6.90  # at 1M docs/day
sustained_monthly_cost:       $207
budget_ceiling:               $400/day > well under

The dominant variable is self-hosted vLLM vs managed embedding API. At our scale (1M docs/day) self-hosted L4 is ~10× cheaper than Bedrock Titan or OpenAI text-embedding-3. The crossover where managed becomes cheaper is around < 50k docs/day, where the L4 sits idle most of the time.


10. Tradeoffs & Alternatives

Debezium vs application dual-write vs polling. Debezium reads the WAL, so it captures every change exactly once with no application changes required. Dual-write (app writes to DB and Kafka) is faster to implement but breaks every time someone forgets to add the Kafka publish. Polling is simple but trades off lag against DB load. We pick Debezium because it makes the "every commit reaches Kafka" guarantee a property of the database, not the application code.

Kafka vs Kinesis vs Pulsar. Kafka has the richest connector ecosystem (Debezium first-class) and the best operational tooling. Kinesis is fine if you're already AWS-heavy and don't need Kafka's exact features; the 24-hour retention default is dangerous for replay scenarios. Pulsar's per-topic geo-replication is nice but the tooling lags Kafka. We pick Kafka.

vLLM vs Bedrock for embedding. Self-hosted vLLM wins on cost and latency at our scale; loses on operational simplicity. Bedrock Titan embedding wins when traffic is bursty and idle GPUs are wasted spend. The hybrid approach — self-hosted for steady state, Bedrock burst on overflow — sounds clever but introduces an embedding-version inconsistency that breaks vector search recall. Pick one model and stick to it.

Snowflake / Databricks streams instead of Postgres CDC. If the source of truth is a warehouse rather than an OLTP DB, Snowflake Streams or Delta Live Tables give the same change-capture semantics with less operational burden — no logical replication slots to manage. The lag floor is higher (warehouse commit cadence is typically minutes, not seconds), so the freshness SLO has to relax to ~5 min p95.

Re-embed everything on model upgrade. When bumping embed_version, naive re-embed of 5M chunks costs ~$30 of vLLM compute and fits in 30 min. Do it as a side pipeline that writes to a new table, swaps the index pointer atomically; never re-embed in place because search recall drops if v1 and v2 vectors coexist in the same index.


11. Common Interview Q&A

Q1: Why CDC instead of having the application publish change events directly?

CDC moves the "every commit gets captured" guarantee from application code into the database. Application dual-write is a distributed transaction across Postgres and Kafka without a coordinator; it will lose events the first time someone forgets to add the publish call to a new endpoint, or when a transaction rolls back after the Kafka publish succeeded. Debezium reading the WAL is, by construction, exactly the events that committed. The cost is operational (one more system to run) but the correctness gain is large.

Q2: How do you ensure exactly-once semantics end-to-end?

Honestly, you don't — you ensure effectively-once via idempotency. Kafka offers at-least-once delivery; the embed worker may re-process a message after a crash. The deterministic chunk_id and ON CONFLICT ... WHERE source_lsn > ... upsert ensure that re-processing produces the same final state. Sales calls it "exactly once"; engineers call it "at-least-once with idempotent sinks," which is what every modern streaming system actually does.

Q3: How do you handle a 1000x burst from a bulk import?

The chunker detects sustained backlog and switches to BALANCED then SHED mode. The embed worker pool autoscales up to its hard cap (say 10 L4s). Beyond that, the chunker downsamples low-priority updates and routes them to a backfill topic with relaxed SLO. After the burst subsides, a backfill consumer drains the deferred topic. The pipeline never stops accepting events; it gracefully degrades freshness for the tail of the backlog while keeping sub-minute freshness for the leading edge.

Q4: How does the lag exporter actually measure end-to-end lag?

It periodically (every 5 s) reads SELECT pg_current_wal_lsn() from the source DB and SELECT MAX(source_lsn) FROM article_chunks from the vector store. The difference, expressed in WAL bytes, divided by the recent write rate, gives a time-equivalent lag. Exposed as a Prometheus gauge with a single alert at > 60 s for 5 min. Beats trying to measure each stage's contribution separately.

Q5: A row is deleted in Postgres. How do you ensure its chunks are removed from the vector index within the SLO?

Debezium emits a delete event with the primary key. The chunker enqueues a tombstone task; the embed worker (or a dedicated tombstone worker) issues DELETE FROM article_chunks WHERE article_id = $1. pgvector marks the HNSW points as deleted immediately; readers stop returning them on the next query. The 60-second SLO is met as long as the CDC pipeline is healthy. For GDPR Art. 17 timing (72 h), we additionally run a weekly index-rebuild job to physically reclaim the tombstoned vectors.

Q6: Walk me through what happens when the embed model is upgraded from v1 to v2.

(1) Spin up a parallel pipeline writing to article_chunks_v2 with the new model; the existing v1 pipeline keeps running. (2) Backfill v2 by replaying the historical CDC log (or by full table scan); takes ~30 min for 5M chunks at 3k/sec. (3) Run the eval suite on both indexes; only proceed if v2 wins on the relevant retrieval metrics. (4) Atomically swap the search index pointer in app config (a single config-server update). (5) Keep v1 for a week as rollback insurance, then drop. The whole upgrade is online — no readers see a moment of mixed-version vectors.


↑ Back to Top