Agent Orchestration: 1,000 Concurrent Workflows, Checkpointing, HITL, Cost Limits

The brief: build a multi-agent orchestration platform that runs 1,000 concurrent agentic workflows with deterministic checkpointing (any node can resume any workflow from its last successful step), human-in-the-loop approvals at designated pause points, and per-workflow cost ceilings enforced at the LLM call site so a runaway agent cannot burn an unbounded amount of money.

"Multi-agent platform" is a phrase that hides several quite different problems. The hard ones are not "how do agents call tools?" but "how do you replay a seven-minute workflow from a checkpoint without re-executing the side-effecting tool calls?", "how do you pause a workflow for three days while a human approves and not lose state?", and "how do you stop a 50-iteration loop from costing $300?" This design is opinionated about all three.



1. Problem & Functional Requirements

The platform runs workflows defined as DAGs of nodes. Each node is one of:

Functional requirements:

Out of scope: model fine-tuning, the agent's reasoning loop quality (that's a prompt-engineering problem), the human-in-the-loop UI itself (separate front-end project).


2. Non-Functional Requirements & SLOs

Metric Target
Concurrent active workflows1,000 sustained, 5,000 burst
Step scheduling latency (next-step pickup)p95 < 200 ms after prior step completes
Checkpoint write latencyp95 < 50 ms
Pause durabilityUp to 30 days, no state loss across restarts
Cost ceiling enforcementHard stop within 1 LLM call after breach
Availability99.9% (workflows are durable; brief outage delays, doesn't lose)
Replay determinismSide-effecting tools are not re-invoked on resume

The "side-effecting tools are not re-invoked on resume" SLO is the hardest. Naive checkpointing replays the workflow from the start; if step 3 sent an email and we crash on step 5, resume must not send the email twice. The solution is event-sourced state with persisted tool results — replay reads from history, doesn't re-execute.


3. Capacity Estimates

Active workflows. 1,000 sustained × ~10 steps each × ~3s per step (LLM call dominates) ≈ 333 step-completions/sec across the cluster. Each completion = one checkpoint write + one queue push.

Checkpoint storage. Each step appends an event of ~5–50 KB (LLM input/output tokens dominate). 333 events/sec × 86400 s × 20 KB ≈ ~580 GB/day of raw history. Tier hot 30d in Postgres (~17 TB — partition aggressively), cold to S3 thereafter.

State store. Active workflows in flight: 1,000 × ~200 KB working state ≈ 200 MB in Redis. Trivial. Persistent durable state in Postgres: 1,000 × ~5 MB cumulative history (compressed) ≈ 5 GB — fits in RAM.

LLM token spend. Average step ~2,000 input + 500 output tokens (tool definitions are big). At Claude Sonnet pricing ($3 in / $15 out per 1M):

per_step_cost: (2000 * 3 + 500 * 15) / 1_000_000 = $0.0135
per_workflow:  10 steps * $0.0135 = $0.135 average
daily_total:   1000 * 24/3 * $0.135 ~= $1,080/day = $32k/month

That's the average. The point of cost limits: a misbehaving workflow that loops for 200 steps at $0.135/step = $27 alone. Without enforcement, one bug in an agent definition can multiply cost by 100x.


4. High-Level Architecture

                  +------------------+
   client -------->|  Workflow API    |  (start, signal, query)
                  +--------+---------+
                           |
                           v
                  +------------------+
                  |  Scheduler       |--+
                  +--------+---------+  |
                           |            | enqueue ready steps
                           v            v
                  +-------------------+      +-----------------------+
                  |  Redis Streams    |<---->|  Worker Pool          |
                  |  (ready queue)    |      |  (Python, K8s)        |
                  +-------------------+      +----------+------------+
                                                        |
            +-------------------------+-----------------+----------------+
            |                         |                                  |
            v                         v                                  v
  +-------------------+   +-----------------------+         +---------------------+
  |  LLM Gateway      |   |  Tool Executor        |         |  Postgres            |
  |  (cost enforce,   |   |  (ACL check, retries) |         |  workflows, events,  |
  |   logging)        |   +-----------+-----------+         |  signals, tool_calls |
  +---------+---------+               |                     +----------+-----------+
            |                         v                                |
            v               +-------------------+                      |
  +-------------------+     |  Tool Registry    |                      |
  |  Bedrock/Anthropic|     |  (HTTP APIs, RPCs,|                      |
  |  /OpenAI          |     |   internal funcs) |                      |
  +-------------------+     +-------------------+                      |
                                                                       |
                              +----------------------+                 |
                              |  Cost Tracker        |<----------------+
                              |  (Redis counters +   |
                              |   nightly rollup)    |
                              +----------------------+

                              +----------------------+
                              |  Human-Gate UI       |
                              |  (signals API)       |
                              +----------------------+

Components, one line each:

The choice to put Redis Streams in front of Postgres (rather than just polling Postgres) buys two things: sub-100ms scheduling latency without hammering Postgres for the next-step query, and a clean consumer-group pattern for worker fan-out with built-in ack/redeliver.


5. Data Model & Checkpoint Schema

Event-sourced. The events table is the source of truth; current workflow state is the fold of its events. This is the only model that gets replay-without-side-effects right.

CREATE TABLE workflows (
  workflow_id    UUID PRIMARY KEY,
  definition_id  TEXT NOT NULL,            -- e.g. "support-triage"
  definition_ver INT  NOT NULL,            -- pinned at start
  tenant_id      UUID NOT NULL,
  status         TEXT NOT NULL,            -- running | waiting_signal | completed | failed | budget_blocked
  current_node   TEXT,
  cost_limit_usd NUMERIC(10,4) NOT NULL,
  cost_used_usd  NUMERIC(10,4) DEFAULT 0,
  started_at     TIMESTAMPTZ DEFAULT now(),
  finished_at    TIMESTAMPTZ
);
CREATE INDEX workflows_status ON workflows (status, started_at);

CREATE TABLE events (
  event_id       BIGSERIAL PRIMARY KEY,    -- monotonic per-DB; sequence not workflow-scoped
  workflow_id    UUID NOT NULL REFERENCES workflows ON DELETE CASCADE,
  seq            INT  NOT NULL,            -- per-workflow monotonic; (workflow_id, seq) unique
  node_name      TEXT NOT NULL,
  event_type     TEXT NOT NULL,            -- step_started | llm_response | tool_result | branch_taken | signal_received | error
  payload        JSONB NOT NULL,           -- inputs, outputs, errors
  cost_usd       NUMERIC(10,6) DEFAULT 0,
  created_at     TIMESTAMPTZ DEFAULT now(),
  UNIQUE (workflow_id, seq)
);
CREATE INDEX events_workflow_seq ON events (workflow_id, seq);

CREATE TABLE tool_calls (
  tool_call_id   UUID PRIMARY KEY,
  workflow_id    UUID NOT NULL REFERENCES workflows ON DELETE CASCADE,
  seq            INT  NOT NULL,            -- matches events.seq
  tool_name      TEXT NOT NULL,
  idem_key       TEXT NOT NULL,            -- (workflow_id, seq, tool_name)
  request        JSONB NOT NULL,
  response       JSONB,
  status         TEXT NOT NULL,            -- pending | ok | error
  created_at     TIMESTAMPTZ DEFAULT now(),
  UNIQUE (idem_key)
);

CREATE TABLE signals (
  signal_id      UUID PRIMARY KEY,
  workflow_id    UUID NOT NULL REFERENCES workflows ON DELETE CASCADE,
  node_name      TEXT NOT NULL,            -- which gate is pending
  expected_type  TEXT NOT NULL,            -- "approval" | "edit" | "reject"
  payload        JSONB,                    -- the human's response
  created_at     TIMESTAMPTZ DEFAULT now(),
  resolved_at    TIMESTAMPTZ
);

The tool_calls table is the side-effect ledger. Before invoking any tool, the worker computes idem_key = sha256(workflow_id, seq, tool_name, canonical_request) and tries to insert a pending row. If the insert fails with unique-violation, the side-effect already happened — read the existing row's response instead. This makes tool execution exactly-once even across worker crashes and replays.

Workflow state computation. "What's next?" is a fold: read all events for the workflow ordered by seq, replay through the workflow definition's reducer, output the current node and accumulated state. This is fast (10s-of-events typical) and means resume is just "read events, replay".


6. Critical Path: One Workflow Step

A workflow definition is a Python module declaring nodes and edges:

from platform import Workflow, llm_node, tool_node, human_gate, branch

wf = Workflow(name="support-triage", version=4, cost_limit_usd=1.00)

@wf.node("classify")
@llm_node(model="claude-haiku-4-5", tools=["search_kb"])
async def classify(state):
    return {
        "system": "Classify the ticket as: bug, feature, billing, or other.",
        "user":   state.input["ticket_text"],
    }

@wf.node("draft_reply")
@llm_node(model="claude-sonnet-4-7", tools=["search_kb", "fetch_account"])
async def draft_reply(state):
    return {
        "system": REPLY_SYSTEM_PROMPT,
        "user":   state.events["classify"].output,
    }

@wf.node("approval")
@human_gate(timeout_days=3)
async def approval(state):
    return {"reviewers": ["support-leads@"], "diff": state.events["draft_reply"].output}

@wf.node("send_reply")
@tool_node(name="send_email")
async def send_reply(state):
    return {"to": state.input["customer_email"], "body": state.events["approval"].payload["edited_body"]}

# Edges
wf.edge("classify", "draft_reply")
wf.edge("draft_reply", "approval")
wf.edge("approval", "send_reply")

Per-step execution inside a worker:

async def execute_step(workflow_id: UUID, seq: int, node_name: str):
    # 1. Load workflow + replay events to current state
    wf = await load_workflow(workflow_id)
    events = await load_events(workflow_id)
    state = wf.definition.replay(events)

    if state.current_node != node_name:
        return  # stale message; drop, scheduler will requeue correct node

    node = wf.definition.get_node(node_name)

    # 2. Compute idempotency key. Insert "started" event idempotently.
    started = await events_insert_if_absent(workflow_id, seq, "step_started",
                                            payload={"node": node_name})
    if not started:
        # Another worker already started this step; skip.
        return

    # 3. Execute the node
    if isinstance(node, LLMNode):
        prompt = await node.build_prompt(state)
        result = await llm_gateway.call(
            workflow_id=workflow_id,
            model=node.model,
            messages=prompt,
            tools=node.tool_specs,
        )  # raises BudgetExceededError if call would breach cost_limit_usd
        await events.insert(workflow_id, seq, "llm_response",
                            payload=result.dict(), cost_usd=result.cost)

    elif isinstance(node, ToolNode):
        request = await node.build_request(state)
        idem_key = canonical_idem(workflow_id, seq, node.tool_name, request)
        # See if we already executed this side-effect
        existing = await tool_calls.get(idem_key)
        if existing and existing.status == "ok":
            response = existing.response
        else:
            response = await tool_executor.invoke(node.tool_name, request,
                                                  idem_key=idem_key)
        await events.insert(workflow_id, seq, "tool_result", payload=response)

    elif isinstance(node, HumanGate):
        await signals.create(workflow_id, node_name, expected_type="approval")
        await workflows.set_status(workflow_id, "waiting_signal")
        return  # Workflow halts until signal arrives.

    # 4. Schedule next ready node(s)
    next_nodes = wf.definition.next_after(node_name, state)
    for nxt in next_nodes:
        await ready_queue.push(workflow_id, seq + 1, nxt)

Several invariants are doing the work here:

The cost-limited LLM call wrapper:

class BudgetExceededError(Exception): pass

class LLMGateway:
    async def call(self, workflow_id: UUID, model: str, messages: list, tools: list):
        # 1. Read current cost; estimate worst-case for this call.
        used = float(await redis.get(f"cost:{workflow_id}") or 0)
        limit = await pg.fetchval("SELECT cost_limit_usd FROM workflows WHERE workflow_id=$1",
                                  workflow_id)
        # Worst case: max_output_tokens at output rate.
        est = self.estimate_max_cost(model, messages, tools, max_output=4096)
        if used + est > float(limit):
            raise BudgetExceededError(
                f"workflow {workflow_id}: used=${used:.4f}, est=${est:.4f}, limit=${limit:.4f}"
            )

        # 2. Make the call.
        resp = await anthropic.messages.create(model=model, messages=messages, tools=tools)
        actual_cost = self.compute_cost(model, resp.usage)

        # 3. Increment counter atomically (INCRBY can't do floats; use a Lua script).
        await redis.eval(self.INCR_FLOAT_LUA, 1, f"cost:{workflow_id}", actual_cost)
        return LLMResult(text=resp.content, tool_calls=resp.tool_use, cost=actual_cost,
                         usage=resp.usage)

The "estimate worst-case" step is what makes this a hard stop, not a soft one. If you only check after the call, a single $30 call can blow a $5 limit. By estimating max-output ahead, you fail the call rather than overspend.


7. Scaling & Bottlenecks

  1. Postgres event write throughput. 333 events/sec sustained, 2k burst. A single instance handles it; partition events by month with rolling detach for retention. tool_calls partitioned similarly.
  2. Worker concurrency vs LLM rate limits. 1,000 workflows × ~1 in-flight LLM call each = 1,000 concurrent calls. Anthropic and OpenAI both rate-limit per-org tokens-per-minute; without a token-bucket in the LLM Gateway, you'll oscillate between bursts and 429s. Use a per-model TPM/RPM bucket; queue overage rather than fail.
  3. Redis Streams as the ready queue. Single Redis can do millions of XADD/sec; not a bottleneck. Use consumer groups so multiple workers split the stream; XACK on success, XCLAIM stale messages after 60s for recovery.
  4. Replay cost grows with event count. A workflow with 200 events takes longer to replay than one with 5. Snapshot every K events: persist the folded state to a snapshots table; resume reads the snapshot + tail events. Keeps replay O(K) regardless of total length.
  5. Hot tenants. One tenant runs 800 of the 1,000 active workflows; their events table partition becomes the write hot spot. Either logical partitioning by tenant_id hash, or per-tenant connection pools to spread WAL pressure.

8. Failure Modes & Resilience


9. Cost Analysis

Per workflow run (10 steps, mixed LLM + tool + 1 human gate):

Component Cost / workflow
LLM calls (8 steps, mix of Sonnet + Haiku)$0.10
Tool calls (2 steps, mostly internal HTTP)$0.001
Worker compute (10 steps × 3s @ 0.5 vCPU)$0.02
Postgres writes + 30d hot storage$0.005
Redis ops + cost-tracker$0.0005
Total / workflow~$0.13
At 1k concurrent × 24/h cycle~$1,000/day = ~$30k/month

The platform overhead (compute, storage, Redis) is <15% of total cost; LLM tokens dominate. Prompt caching of static tool definitions on Anthropic's API cuts the LLM line item another 30–50% for workflows with stable tool schemas — a free win once you add a cache_control marker to the tool block.


10. Tradeoffs & Alternatives

Option Wins on Loses on
This DIY (Postgres + Redis Streams) Full control over checkpoint schema, cost enforcement, ACLs, and replay semantics. No vendor lock-in. You own the runtime — backpressure, retries, snapshots, observability all on you.
Temporal Industry-grade workflow engine; durable workflows, signals, queries, schedules baked in. Battle-tested at Uber scale. Self-hosting is heavyweight (Cassandra/Postgres + history service + matching service + frontend); SDK opinions don't always fit Pythonic agent code; cost limit per workflow is your job to wrap.
Restate Lightweight Temporal alternative; single binary; durable execution + journaling. Younger ecosystem; smaller community; fewer integrations.
LangGraph Native to the LangChain ecosystem; checkpointing built in; great for prototype-to-prod on the same code. Persistence backends are fewer (SQLite, Postgres, Redis — no S3 history); production scaling story still maturing; cost-limit + ACL not first class.
AWS Step Functions + Lambda Fully managed; pay-per-execution; great for short workflows. Cost spirals with state-transition pricing on long-running agentic loops; ASL JSON DSL is awkward for dynamic agent flows; 25k entries history limit hurts.
SQS + DIY state in DynamoDB Simplest possible AWS-native build; cheap. You're rebuilding Temporal poorly; no replay semantics; HITL is yours to engineer.

Decision rule. Below ~50 workflows/day, LangGraph is fine and the persistence layer matters less. From there to ~10k/day, this DIY (or LangGraph + custom persistence) gives you the cost-limit and ACL primitives you need without the operational tax of Temporal. Above 10k/day or with strict compliance / multi-tenant isolation needs, Temporal earns its weight — the durability and replay guarantees are battle-proven in a way that DIY rarely matches.


11. Common Interview Q&A


Common Interview Questions:

How does the platform replay a workflow without re-executing side effects?

Event-sourced state plus a side-effect ledger. The events table is append-only; replay reads events in order and folds them through the workflow definition's reducer to compute current state — no LLM or tool is re-invoked. The tool_calls table has a unique idem_key = sha256(workflow_id, seq, tool_name, canonical_request); before any tool execution, the worker tries to insert a pending row, and if the insert fails due to unique-violation, the response is read from the existing row instead of re-invoking. So even on worker crashes mid-step, the same email is never sent twice.

Why Postgres + Redis Streams instead of just Postgres or just Redis?

Different jobs. Postgres is the durable source of truth — events, tool calls, signals must survive restarts and provide cross-row transactional consistency for replay. Redis Streams is the low-latency hot path for "what's the next ready step?" — sub-millisecond push/pop with consumer groups gives you 100s of step pickups per second without polling Postgres on every tick. If you only had Postgres, scheduling latency would be 1–10s on a busy DB. If you only had Redis, you'd lose checkpoint durability on a Redis restart. The split aligns each store with what it's good at.

How is the per-workflow cost limit enforced as a hard stop?

At the LLM call site, before the call leaves the gateway. The gateway reads cost_used_usd from a Redis counter, computes worst-case cost for this call (full max_output_tokens at the model's output rate plus the input tokens), and refuses with BudgetExceededError if used + worst_case > limit. The post-call increment uses the actual cost, but the pre-call check uses worst-case so a single call can never overshoot by more than rounding. If the call goes through, the worker writes an llm_response event with the actual cost and the workflow continues.

How do human-in-the-loop pauses survive multi-day waits without leaking state?

The HITL node writes a signal row, sets workflow.status = 'waiting_signal', and returns. Nothing is in flight — no worker holds the workflow, no Redis stream entry is pending, no compute cost accrues. The scheduler explicitly ignores workflows in this status. When a signal arrives via the API, the platform writes a signal_received event, flips the workflow back to running, and pushes the next ready node onto the stream. State is in Postgres the whole time; durability is the same as any other table. A 30-day pause looks identical to a 30-second one.

Temporal vs LangGraph vs DIY — what's the call?

LangGraph for prototypes and small-team production where you want the agent code and the runtime in the same repo — checkpointing works, but cost limits and ACLs are your job to wrap. Temporal when you need bulletproof durability, multi-language SDKs, signals/queries baked in, and you can absorb the operational weight (Cassandra or Postgres history store, matching service, frontend service). DIY (this design) when you want Temporal's primitives but with cost enforcement and tool ACLs as first-class concepts and a Pythonic SDK that fits your agent code — you're trading "more code to maintain" for "exactly the policy you need." Step Functions is a trap for agentic workflows: state-transition pricing kills you on long loops.

How does replay stay fast as workflows accumulate hundreds of events?

Snapshots. Every K events (typically 25 or 50), the platform persists the folded state to a snapshots table keyed by (workflow_id, seq). To resume or replay, the worker reads the latest snapshot and only the tail events after it — so replay is O(K) regardless of total workflow length. The events stay in place (never deleted; that would break audit), but they don't have to be re-folded from zero each time. The snapshot is just a JSON blob of the reducer's state, so it's cheap to write and read. K is tuned per definition: agentic loops with many small steps want K=10; long-pause workflows with few steps don't need snapshots at all.

↑ Back to Top