Skip to content

BullMQ vs LangGraph (HISTORICAL)

HISTORICAL — NOT THE CURRENT RUNTIME. Cinatra’s agent runtime is WayFlow, the reference OAS implementation, running as a Python sidecar invoked over A2A. The BullMQ-side content below is still accurate; the LangGraph-side content describes the prior runtime and is retained for historical reference only.

Canonical reference for the separation of concerns between the BullMQ job layer and the LangGraph agent-state-machine layer, and the migration plan for replacing Cinatra’s hand-rolled HITL state machine with native LangGraph interrupt() / Command(resume=...) primitives.

Phase 110-G complete — TypeScript agent execution layer retired. agentic-execution.ts, agentic-resume.ts, agentic-tools.ts, tool-interceptor.ts, and resume.ts have been deleted. execution.ts now dispatches exclusively to runLangGraphJob. The planned_actions and review_tasks tables have been dropped from the schema and the live DB. All HITL routing uses synthetic ID prefixes (setup-{runId} and lg-{runId}). See Migration plan → Phase D below for the completed work.

Phase 110-H complete — architecture finalized. WR-01 concurrent dispatch guard applied (updateAgentRunStatusConditional CAS before runLangGraphJob). WR-04 REST polling path now returns real inputSchema fields for setup-loop runs. IN-02 agent_update MCP tool no longer accepts "default" as executionProvider. AGENTS.md boundary rule promoted. Migration plan fully closed.

Every agent run in Cinatra passes through two different engines. Respect the boundary between them:

  • BullMQ owns scheduling, durability, retries, worker pooling, and cancellation signals. It is the trigger layer — it schedules a job, guarantees the job eventually runs, and gives operators a dashboard (/settings/operations/jobs) to see queue state.
  • LangGraph owns the agent execution graph — nodes, edges, conditional routing, shared state, and HITL interrupt/resume. It is the agent runtime layer.

The two layers meet at exactly one function: runLangGraphJob in packages/agents/src/langgraph-execution.ts. BullMQ dispatches a run to this worker; the worker creates a LangGraph thread and streams events back. No other code path should cross the boundary.

What BullMQ owns (and will continue to own after the migration):

  • Queue: cinatra-background-jobs — single queue for all background work (src/lib/background-jobs.ts, line 53).
  • Worker pool: concurrency: 4, single Worker instance per process, registered via getBackgroundJobRuntime().
  • Job names: BACKGROUND_JOB_NAMES enum (line 9-38) — AGENT_BUILDER_LANGGRAPH_EXECUTION is the agent-run entry point. Other names cover non-agent work (blog, transcript, email-outreach sub-steps, Ross, scrape, research, enrichment, pricing sync).
  • Durability: Redis persistence of job payloads, retry state, and scheduled jobs.
  • Retry / backoff: attempts, backoff options on enqueueBackgroundJob(...).
  • Cancellation: AbortController map (runtime.abortControllers) + poll-based cancellation metadata written to system_metadata. Workers check the flag every BACKGROUND_JOB_ABORT_POLL_INTERVAL_MS (750ms).
  • Observability: BullMQ Board mounted at /settings/operations/jobs/[[...slug]] (QueueDash).

What BullMQ does NOT own:

  • agent conversation state
  • HITL pause/resume semantics
  • per-step retry budgets (execution-layer concept, not a queue concept)
  • presentation-hint parsing

Removed job names (post-110-G): AGENT_BUILDER_RESUME has been removed. It was the resume entry point for the hand-rolled TS HITL loop (resume.ts, agentic-resume.ts). Do not add new code that enqueues it. The two valid HITL resume paths are AGENT_BUILDER_EXECUTION (setup loop re-entry) and AGENT_BUILDER_LANGGRAPH_EXECUTION (LangGraph resume with resume: payload).

What LangGraph owns (Phase 110-G onward):

  • Graph registry: 6 stable type graphs in packages/langgraph-agents/graphs/ (leaf_v1, proxy_v1, orchestrator_v1, parallel_v1, supervisor_v1, iterative_v1) declared in packages/langgraph-agents/langgraph.json.
  • Dispatch: runLangGraphJob resolves template.type → graphId (TYPE_TO_GRAPH map in langgraph-execution.ts, line 36) and calls client.runs.stream(thread_id, graphId, { input, streamMode: "events" }).
  • Thread lifecycle: client.threads.create()thread_id persisted on agent_runs.lg_thread_id (column added Phase 92).
  • Execution state: LangGraph Server persists the graph’s StateGraph inside its own Postgres instance — separate from Cinatra’s cinatra schema.
  • Event stream: LangChain callback events (on_chain_start, on_chat_model_stream, on_tool_start, on_tool_end, on_chain_end, interrupt, on_chain_error) consumed by runLangGraphJob and translated to AG-UI events via AgUiAdapter.
  • HITL interrupt/resume: interrupt() inside graph nodes; client.runs.stream(..., { command: { resume } }) for approval. The runLangGraphJob interrupt handler emits agUiAdapter.onInterrupt(...) with a synthetic lg-{runId} ID — no DB writes during interrupt emission.
  • Setup field collection: the setup interrupt loop in execution.ts passes setup-{runId} synthetic IDs and re-enters runAgentBuilderExecutionJob after each field approval; the LangGraph graph receives all collected fields in its input dict.

All HITL approval routing uses synthetic ID prefixes. The planned_actions and review_tasks tables are dropped.

PrefixSourceWhat it does on approval
setup-{runId}Setup interrupt loop in execution.tsMerges submitted field into agent_runs.inputParams; re-enqueues AGENT_BUILDER_EXECUTION
lg-{runId}LangGraph interrupt() handler in langgraph-execution.tsEnqueues AGENT_BUILDER_LANGGRAPH_EXECUTION with resume: { values, fieldName }
Real UUIDPre-migration rows (no longer exist)Throws — “real UUID paths are not supported after Phase 110-G migration”

Rule: Never pass a real UUID to approveReviewTaskInternal. Both interrupt emitters now pass synthetic IDs directly to agUiAdapter.onInterrupt with no DB writes.

The overlap — hand-rolled state machine (historical, now deleted)

Section titled “The overlap — hand-rolled state machine (historical, now deleted)”

The following files existed in the hand-rolled TS execution layer and have been deleted in Phase 110-G:

FileDeleted inWhat it owned
agentic-execution.ts110-GLLM agent loop, HitlPauseSignal catch branch
agentic-resume.ts110-GBullMQ resume worker for agentic path
agentic-tools.ts110-GChild agent spawner, invokeAgentAsTool
tool-interceptor.ts110-GRisk-class map, HitlPauseSignal, budget guards
resume.ts110-GDeterministic resume step loop

orchestrator-execution.ts survives with its non-dispatch symbols only: cancelOrchestratorRun, TERMINAL_STATUSES, OrchestratorLedgerEntrySchema, OrchestratorLedgerSchema, buildLedgerFromChildren, resolveInstalledVersion, computeChildInput. The dispatch phase (runOrchestratorJob, enqueueChildFlow, WaitingForHumanError) was deleted in Phase 110-F.

One row per code span (function or named block) that was deleted, moved, or reduced. Listed for historical reference and to help sequence any remaining cleanup.

Function / SpanLinesWhat it doesMigration fatePhase
assertOrchestratorReady36–74Validates all declared agentDependencies are installed; orchestrator gateKEPT
Version pinning block127–179Reads agent_template_versions snapshot for run.packageVersionKEPT
Setup Interrupt Loop187–355For each required inputSchema field missing from run.inputParams: sets pending_approval + emits AG-UI INTERRUPT with synthetic setup-{runId} ID; no DB writesKEPT (rewritten — DB writes removed in 110-G)110-G
LangGraph dispatch362–365if executionProvider: "wayflow" or "default" → runLangGraphJobKEPT — sole dispatch branch
Unsupported provider throw369–371Throws for null or unknown executionProviderADDED in 110-G110-G
Orchestrator dispatchdeletedif type === "orchestrator" → runOrchestratorJobDELETED110-F
Agentic dispatchdeletedif executionMode === "agentic" → runAgentBuilderAgenticJobDELETED110-G
Deterministic step loopdeletedSequential step iteration with approval gatesDELETED110-G

Entire file deleted in 110-G. resumeAgentBuilderExecutionJob (BullMQ worker) is gone. LangGraph resumes via AGENT_BUILDER_LANGGRAPH_EXECUTION with resume: payload.

Entire file deleted in 110-G. All spans removed.

Entire file deleted in 110-G. All spans removed.

Entire file deleted in 110-G. All spans removed.

Entire file deleted in 110-G. All spans removed.

orchestrator-execution.ts (reduced — ~200 lines)

Section titled “orchestrator-execution.ts (reduced — ~200 lines)”
Function / SpanLinesWhat it doesMigration fatePhase
WaitingForHumanError classdeletedError for parking BullMQ jobDELETED110-F
TERMINAL_STATUSES constantkeptSet of terminal status stringsKEPT
OrchestratorLedgerEntrySchema / OrchestratorLedgerSchemakeptZod schemas for ledger entriesKEPT
buildLedgerFromChildrenkeptMaps child run rows to ledger entriesKEPT
resolveInstalledVersionkeptReads installed semver for a package depKEPT
computeChildInputkeptReturns orchestrator’s inputParams as child inputKEPT
runOrchestratorJob dispatch phasedeletedCreated child runs, FlowProducer dispatchDELETED110-F
enqueueChildFlow (FlowProducer)deletedBullMQ parent/child flowDELETED110-F
runOrchestratorRollupkept (reduced)Aggregates terminal child statuses; audit-side write for agent_runsKEPT
cancelOrchestratorRunkeptFan-out cancel across child runsKEPT

Symbols deleted in the migration. Listed for reference; all importers have been updated.

SymbolSource FileReplaced byPhase
runAgentBuilderExecutionJobexecution.tsStill exists — now a thin dispatcher to runLangGraphJob only
resumeAgentBuilderExecutionJobresume.ts (deleted)runLangGraphJob with resume: payload110-G
runAgentBuilderAgenticJobagentic-execution.ts (deleted)Deleted110-G
runAgentBuilderAgenticResumeJobagentic-resume.ts (deleted)Deleted110-G
runOrchestratorJoborchestrator-execution.ts (deleted span)Deleted110-F
WaitingForHumanErrororchestrator-execution.ts (deleted span)Deleted110-F
cancelOrchestratorRunorchestrator-execution.tsKEPT
OrchestratorLedgerSchemaorchestrator-execution.tsKEPT
HitlPauseSignaltool-interceptor.ts (deleted)LangGraph interrupt()110-G
AGENTIC_MAX_STEPStool-interceptor.ts (deleted)LangGraph recursion_limit110-G
TOOL_RISK_CLASSEStool-interceptor.ts (deleted)JSON artifact for Python (pending D-03)110-G
buildAgenticAgentToolsagentic-tools.ts (deleted)Deleted110-G
BACKGROUND_JOB_NAMES.AGENT_BUILDER_RESUMEbackground-jobs.tsDeleted — resume is AGENT_BUILDER_LANGGRAPH_EXECUTION with resume: data110-G

runAgentBuilderExecutionJob (BullMQ worker)
├─ version pinning (packageVersion → readAgentTemplateVersionBySemver)
├─ assertOrchestratorReady (orchestrator dep-check)
├─ Setup Interrupt Loop (emits INTERRUPT per pending inputSchema field — no DB writes)
└─ Dispatch:
├─ executionProvider: "wayflow" | "default" → runLangGraphJob (only valid path)
└─ anything else → throws "Unsupported executionProvider after 110-G migration: ..."

All other dispatch branches (orchestrator, agentic, deterministic step loop) have been deleted.

HITL surface — current state (post-110-G)

Section titled “HITL surface — current state (post-110-G)”
  1. execution.ts setup interrupt loop detects a pending required inputSchema field.
  2. Sets agent_runs.status → pending_approval. Constructs synthetic ID setup-{runId}.
  3. Calls agUiAdapter.onInterrupt(fieldSchema, xRenderer, inputParams, syntheticId, fieldName)no DB writes.
  4. Human submits field value → approveReviewTaskInternal routes on setup- prefix → merges value into agent_runs.inputParams → re-enqueues AGENT_BUILDER_EXECUTION.
  5. runAgentBuilderExecutionJob re-enters, re-evaluates pending fields, either emits next INTERRUPT or falls through to runLangGraphJob.
  1. LangGraph node calls interrupt({ toolName, args, provenance }).
  2. LangGraph Server persists thread state and emits an interrupt event on the SSE stream.
  3. runLangGraphJob sees the interrupt event → constructs synthetic ID lg-{runId} → emits agUiAdapter.onInterrupt(...)no DB writes.
  4. Human clicks Approve → approveReviewTaskInternal routes on lg- prefix → enqueues AGENT_BUILDER_LANGGRAPH_EXECUTION with resume: payload.
  5. runLangGraphJob calls client.runs.stream(thread_id, graphId, { command: { resume: values } }). No synthetic messages, no re-running of prior tools.

The key invariant preserved:

  • agent_runs.status transitions (running → pending_approval → running → completed/failed/stopped) stay byte-identical so existing UI code (polling route + AgenticRunPanel) keeps working.
  • AG-UI event shape stays byte-identical (onInterrupt, onRunFinished, onToolCallStart/End) so the Tier 1 renderer and chat useAgUiRunStream keep working.
  • Non-agent background jobs — blog post generation, transcripts, email outreach per-draft workers, Apify scraping, LiteLLM pricing sync. These are short-lived deterministic jobs with no state machine; BullMQ’s retry + durability is the right primitive.
  • Scheduled work — anything triggered on a cron. LangGraph has no native scheduler.
  • Agent-run dispatchAGENT_BUILDER_LANGGRAPH_EXECUTION stays as the single entry point into runLangGraphJob.
  • packages/llm-orchestration permanent components (registry.ts, mcp-access.ts, telemetry.ts, tools/skills.ts) — used by TypeScript connector packages (email outreach, blog generation) independently of the agent execution layer.
  • packages/connector-* — all connectors stay TypeScript.
  • MCP server routes (/api/mcp) — TS owns the MCP surface.
  • All Next.js routes and server actions.

BullMQ trigger → LangGraph run (post-migration runLangGraphJob entry)

Section titled “BullMQ trigger → LangGraph run (post-migration runLangGraphJob entry)”
packages/agents/src/langgraph-execution.ts
export async function runLangGraphJob(
data: { runId: string; resume?: { approved: boolean; values?: unknown } },
jobId: string,
): Promise<void> {
const { runId, resume } = data;
const run = await readAgentRunById(runId);
if (!run) throw new Error(`Run ${runId} not found`);
const template = await readAgentTemplateById(run.templateId);
const graphId = resolveGraphId(template);
const thread_id = run.lgThreadId ?? (await client.threads.create()).thread_id;
if (!run.lgThreadId) await updateAgentRunLgThreadId(runId, thread_id);
if (resume) {
// Resume a paused graph — Command(resume=...) is the single path
for await (const event of client.runs.stream(
thread_id,
graphId,
{ command: { resume: resume } }
)) {
await handleLangGraphEvent(event, run, agUiAdapter);
}
} else {
// Fresh run
for await (const event of client.runs.stream(
thread_id,
graphId,
{ input: buildGraphInput(run) }
)) {
await handleLangGraphEvent(event, run, agUiAdapter);
}
}
}

Enqueue a LangGraph resume from an approval action

Section titled “Enqueue a LangGraph resume from an approval action”
// packages/agents/src/review-task-actions.ts (post-110-G shape)
await enqueueBackgroundJob(BACKGROUND_JOB_NAMES.AGENT_BUILDER_LANGGRAPH_EXECUTION, {
runId: run.id,
resume: { values, fieldName },
});

Python graph with interrupt() and PostgresSaver checkpointer

Section titled “Python graph with interrupt() and PostgresSaver checkpointer”
# packages/langgraph-agents/graphs/leaf_v1.py (post-migration shape)
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
def build_graph(config: dict): # noqa: ARG001
builder = StateGraph(LeafState)
builder.add_node("setup_collector", setup_collector_node)
builder.add_node("agent_loop", agent_loop_node)
builder.set_entry_point("setup_collector")
builder.add_edge("setup_collector", "agent_loop")
checkpointer = PostgresSaver.from_conn_string(os.environ["SUPABASE_DB_URL"])
# checkpointer.setup() is a one-time DDL migration — run separately, not on every server restart:
# python -c "import os; from langgraph.checkpoint.postgres import PostgresSaver; PostgresSaver.from_conn_string(os.environ['SUPABASE_DB_URL']).setup()"
return builder.compile(checkpointer=checkpointer)
def setup_collector_node(state: LeafState, config: RunnableConfig):
"""Single setup node replaces execution.ts lines 193-293 interrupt loop."""
for field in state["inputSchema"]["required"]:
if field not in state["collectedInputs"]:
value = interrupt({"field": field, "schema": state["inputSchema"]})
state["collectedInputs"][field] = value
return state

Four phases to move from the hand-rolled state machine to native LangGraph HITL while preserving the existing audit surface and UI contract.

Status as of Phase 110-G: Phases A through D are complete. The TypeScript agent execution layer has been retired.

Mapping to ROADMAP sub-phases: Phase A = 110-C; Phase B = 110-D + 110-E + 110-F; Phase C = 110-F (orchestrator leg); Phase D = 110-G. Phase 110-B audited and extended this document. Phase 110-H finalises the post-migration architecture.

Doc phaseROADMAP sub-phasePrimary deletion targetStatus
A (HITL pilot)110-Ctool-interceptor.ts throw path, execution.ts interrupt loopComplete
B (agentic loop)110-D + 110-Eagentic-execution.ts, agentic-resume.ts, agentic-tools.tsComplete
B (resume leg)110-Fresume.ts, agentic-resume.tsComplete
C (orchestrator)110-F (orchestrator leg)orchestrator-execution.ts, FlowProducer wiringComplete
D (deterministic)110-Gexecution.ts step loop, WaitingForHumanError, planned_actions + review_tasks table dropComplete
110-HPost-migration audit + AGENTS.md ruleComplete

Phase A — HITL inside a graph (complete)

Section titled “Phase A — HITL inside a graph (complete)”

Wired interrupt() inside leaf_v1 and extended runLangGraphJob interrupt handler to emit AG-UI HITL events.

Phase B — Agentic loop to LangGraph (complete)

Section titled “Phase B — Agentic loop to LangGraph (complete)”

agentic-execution.ts, agentic-resume.ts, agentic-tools.ts, tool-interceptor.ts deleted.

Phase C — Orchestrator to subgraph dispatch (complete)

Section titled “Phase C — Orchestrator to subgraph dispatch (complete)”

runOrchestratorJob, enqueueChildFlow, and WaitingForHumanError deleted from orchestrator-execution.ts. The file survives with its non-dispatch symbols only.

Phase D — Deterministic step loop retirement (complete, Phase 110-G)

Section titled “Phase D — Deterministic step loop retirement (complete, Phase 110-G)”

Completed work:

  1. Deleted execution.ts deterministic step loop (lines 326–534 in the pre-110-G file).
  2. Deleted resume.ts entirely — resumeAgentBuilderExecutionJob removed from BACKGROUND_JOB_NAMES.
  3. Removed DB writes from the setup interrupt loop — planned_actions and review_tasks rows are no longer created during interrupt emission. Synthetic setup-{runId} ID is passed directly to agUiAdapter.onInterrupt.
  4. Dropped planned_actions and review_tasks tables from schema and live DB.
  5. Updated approveReviewTaskInternal in review-task-actions.ts to route exclusively on synthetic ID prefixes (setup-{runId}, lg-{runId}). Real UUID path throws.
  6. Added explicit throw in execution.ts for null or unknown executionProvider after setup loop.
  7. AGENT_BUILDER_RESUME removed from BACKGROUND_JOB_NAMES.

Post-110-G invariant: execution.ts is a ~372-line BullMQ entry function that delegates exclusively to runLangGraphJob. Every agent template must have executionProvider: "wayflow" set.

Post-110-H invariant: The concurrent dispatch race window is closed — updateAgentRunStatusConditional(runId, "queued", "running") is the atomic CAS gate inside runAgentBuilderExecutionJob immediately before the runLangGraphJob dynamic import. A second BullMQ retry that dequeues the same job will fail the CAS and return early without entering runLangGraphJob. The agent_update MCP tool no longer accepts "default" as an executionProvider value (IN-02). Legacy DB rows that still carry "default" continue to route correctly via the runtime alias || template.executionProvider === "default" in execution.ts — the two concerns are separate. The REST polling route /runs/[runId] now returns the actual template.inputSchema in the setup-fallback hitlContext instead of {} (WR-04), so REST polling clients have field metadata to render HITL approval forms.

  • agent_runs — unchanged. lg_thread_id column (Phase 92) is load-bearing.
  • agent_templatesexecution_mode column is advisory; execution_provider value "default" is a transitional alias only.
  • agent_run_messages — retained for chat-history display, populated by LangChain message callbacks.
  • agent_template_versions — unchanged. Version pinning still runs in the TS dispatcher before graph input is built.
  • audit_events — unchanged.
  • planned_actions — dropped. Was the execution-state store for the hand-rolled HITL loop. Audit-projection role (110-D) was transitional; the authoritative “what is this run paused on?” state lives in the LangGraph checkpointer.
  • review_tasks — dropped. Was the UI-facing HITL approval queue. Approval routing now uses synthetic ID prefixes; the LangGraph checkpointer is the authoritative pause state.

Do not re-create these tables. Do not add new code that reads or writes them.

This is the one place where the two layers meaningfully interact after the migration.

  • BullMQ cancellation (cancelBackgroundJob(jobId)) continues to be the operator-facing kill switch. It writes to the background_job_cancellation_requests metadata key.
  • runLangGraphJob MUST poll AbortController.signal.aborted every 750ms (same pattern as all other workers). On abort it must:
    1. Call client.runs.cancel(thread_id, run_id) to stop the LangGraph execution.
    2. Transition agent_runs.status → stopped via updateAgentRunStatus.
    3. Emit AgUiAdapter.onRunFinished("stopped") for UI closure.
  • Server-side timeout (run.timeoutSeconds) is enforced in runLangGraphJob around the stream loop. LangGraph itself does not know about this timeout — it’s a BullMQ-worker concern.

Do not push the BullMQ abort signal into the LangGraph graph as a LangGraph-native primitive. Keep the kill switch at the worker boundary.

  • Adding any code that enqueues AGENT_BUILDER_RESUME — the job no longer exists.
  • Referencing planned_actions or review_tasks tables in new code — they are dropped.
  • Writing DB rows during interrupt emission (setup loop or LangGraph interrupt handler) — both paths now use synthetic IDs and no DB writes.
  • Passing real UUIDs to approveReviewTaskInternal — only setup-{runId} and lg-{runId} prefixes are valid after Phase 110-G.
  • Adding new code paths to execution.ts beyond the setup interrupt loop and LangGraph dispatch — all new agent execution logic belongs in LangGraph graphs or in runLangGraphJob.
  • Pushing BullMQ cancellation into the graph as a LangGraph primitive — keep the kill switch at the worker boundary.
  • Building a second queue alongside cinatra-background-jobs — a single queue with job-name routing is sufficient for v1.
  • Using client.threads.create() outside of runLangGraphJob — thread creation and lifecycle belongs in one place.
  • Running LangGraph Server in the same process as Next.js — LangGraph Server is a separate service (docker compose up in packages/langgraph-agents).
  • Passing executionProvider: "wayflow"] only (IN-02). Legacy DB rows that still carry "default" continue to route correctly via the runtime alias in execution.ts; do not conflate MCP input validation with runtime dispatch logic.
  • Calling runLangGraphJob from runAgentBuilderExecutionJob without a preceding updateAgentRunStatusConditional CAS — two concurrent BullMQ retries can both pass the run.status !== "queued" read-check at line 92 before either writes anything; the CAS is the only serialization point.
  • Calling orchestrateGenerate from a LangGraph graph — use LangChain model classes (ChatOpenAI, ChatAnthropic) inside the graph; reserve orchestrateGenerate for non-agent LLM work inside TypeScript connector packages.
  • docs/ai/llm-orchestration.md — orchestration layer that stays TypeScript.
  • docs/ai/mcp-patterns.md — MCP primitive conventions used by both TS and Python layers.
  • docs/ai/agent-development.md — authoring new agent graphs.
  • packages/langgraph-agents/README.md — LangGraph Server setup and deployment.
  • .planning/ROADMAP.md Phase 999.8 — TypeScript agent execution layer retirement (the terminal cleanup phase after the migration is done).