Skip to content

LangGraph Graphs — Cinatra Agent Patterns (HISTORICAL)

HISTORICAL — NOT THE CURRENT RUNTIME. Cinatra’s agent runtime is WayFlow, the reference OAS implementation, running as a Python sidecar invoked over A2A. This document describes the prior LangGraph-based runtime and is retained for reference only. For the current architecture see open standards in Cinatra and architecture.

Reference for building and extending LangGraph agent graphs in packages/langgraph-agents/. Read alongside bullmq-vs-langgraph.md (layer boundary) and llm-orchestration.md (provider abstraction).


Graph IDFileWhen to use
leaf_v1graphs/leaf_v1.pySingle bounded task, one LLM call, no HITL
proxy_v1graphs/proxy_v1.pyMultiple tightly-coupled steps in one session, single HITL surface
iterative_v1graphs/iterative_v1.pyRepeated worker+critic refinement loop until quality threshold
supervisor_v1graphs/supervisor_v1.pyLLM dynamically picks the next specialist; loop continues until goal met
parallel_v1graphs/parallel_v1.pyFan-out to multiple workers, combine results
orchestrator_v1graphs/orchestrator_v1.pyMulti-stage workflow with mid-run HITL gates driven by approval_policy.steps

All six graphs share the same entry point (setup_collector) and the same LLM execution pattern (run_cinatra_llm_step). They differ only in topology — number of nodes, edges, and whether they contain HITL gates.


At runtime the MCP server must be injected as a native provider connector, not as 250+ individual function tools. The agent’s SKILL.md (compiled into task_spec) tells the LLM which tools to call and when. The LLM decides which tools to use; runtime does not filter the toolset.

Native MCP connector (required — no individual tool expansion)

Section titled “Native MCP connector (required — no individual tool expansion)”

langchain-mcp-adapters converts each MCP primitive into an individual LangChain function tool. Do not use this path — the Cinatra MCP server has 250+ primitives and OpenAI’s API limit is 128 individual function tools.

The correct implementation is _build_provider_native_mcp_config(provider, url, bearer_token) in cinatra_sdk/mcp.py, provider-specific:

ProviderShapeNotes
OpenAI{"type": "mcp", "server_label": "cinatra", "server_url": URL, "require_approval": "never", "headers": {"Authorization": "Bearer TOKEN"}}Verify ChatOpenAI.bind_tools() passes raw dicts through without converting to function schema — not confirmed in LangChain docs
Anthropicmcp_servers=[{"type": "url", "name": "cinatra", "url": URL, "authorization_token": TOKEN}] on ChatAnthropic + tools=[{"type": "mcp_toolset", "mcp_server_name": "cinatra"}] + beta header mcp-client-2025-11-20Header version matters — 2025-04-04 is deprecated
GeminiProvider has beta MCP (https://ai.google.dev/gemini-api/docs/interactions) but langchain-google-genai may not expose it yetKeep expansion fallback, mark as temporary

LangChain/LangGraph has no generic unified MCP pass-through. Each provider requires separate config. The main Cinatra app (packages/llm-orchestration) already handles this correctly in TypeScript via withMcpServerTool — the Python layer must replicate the same pattern.


Every node that does LLM work calls run_cinatra_llm_step — no exceptions, no shortcuts. This delegates to the TypeScript runResolvedSkillAwareDeterministicLlmTask, which handles MCP injection, skill delivery (SKILL.md via shell tool), max_steps enforcement, and cost tracking in a single call without a tunnel hop.

async def _run_stage(state: SomeState) -> dict:
from cinatra_sdk.llm_step import run_cinatra_llm_step
task_spec = state.get("task_spec") or ""
prompt = state.get("prompt") or task_spec
# The TS layer sends prompt = task_spec + "\n\nWorkflow inputs:\n..."
# Strip the task_spec prefix so it isn't sent twice.
if task_spec and prompt.startswith(task_spec):
user_content = prompt[len(task_spec):].lstrip("\n") or prompt
else:
user_content = prompt
output = await run_cinatra_llm_step(
run_config=dict(state),
user=user_content,
skill_ids=state.get("skill_ids") or [],
system=task_spec,
max_steps=6,
)
return {"output": output}

What run_cinatra_llm_step does:

  • Posts to <origin>/api/llm-bridge with the graph’s bearer_token.
  • Derives origin from state["a2a_base_url"] (stripping the /api/a2a suffix) or falls back to state["mcp_server_url"].
  • The TS bridge delegates to runResolvedSkillAwareDeterministicLlmTask, which injects the MCP server tool, loads SKILL.md via the local shell tool, and enforces max_steps (server-side cap: 20).
  • Returns the LLM’s final text, or "" on null output.
  • Raises ValueError immediately if bearer_token or base_url is missing — fail-fast, not a silent HTTP error.

Required state fields: bearer_token and either a2a_base_url or mcp_server_url must be declared in the graph’s TypedDict and populated by the TS execution layer.

Zero hardcoded tool names. The LLM receives all MCP tools and decides which to call based on task_spec. Agent graphs must never call specific MCP tool names directly — that is the LLM’s job.

Legacy pattern (deprecated): build_cinatra_node() + create_deep_agent() from deepagents was the old approach. Do not use it in new graphs — it bypasses the TS orchestration layer and runs a separate ReAct loop inside the container.


Every graph declares its own TypedDict state. All graphs carry the same base fields:

class SomeState(TypedDict, total=False):
# TS layer sends these
prompt: str # task_spec + workflow inputs, combined by TS execution layer
task_spec: str # agent instructions — used as system prompt
skill_ids: list # skill IDs to load
bearer_token: str # Cinatra server-to-server auth token for MCP calls
provider: str # openai | anthropic | gemini
openai_api_key: str
anthropic_api_key: str
gemini_api_key: str
# Standard output
output: str # final text output, read by TS runner
# Setup collector fields
input_schema: dict # forwarded from graphInput; drives setup_collector interrupts
collected_inputs: dict # accumulates approved field values during setup phase
input_params: dict # merged final values after setup_collector completes

Pitfall P8 — undeclared fields are dropped by the checkpointer. Any field written by a node must be declared in the TypedDict. Undeclared fields survive during a single in-memory run but are silently dropped on resume after an interrupt() because LangGraph’s checkpointer serializes only declared fields. Always declare extension fields.


Every graph uses setup_collector_node as its first node. It iterates input_schema.required, calls interrupt() for each missing field, and accumulates values into collected_inputs and input_params.

def build_graph(config: dict):
from cinatra_sdk.setup_collector import setup_collector_node, make_checkpointer
graph = StateGraph(SomeState)
graph.add_node("setup_collector", setup_collector_node)
# ... other nodes ...
graph.set_entry_point("setup_collector")
graph.add_edge("setup_collector", "next_node")
return graph.compile(checkpointer=make_checkpointer())

make_checkpointer() returns a PostgresSaver when SUPABASE_DB_URL is set, otherwise MemorySaver. The checkpointer is required — interrupt() cannot persist graph state without it.


build_hitl_gate — fixed-step gate factory

Section titled “build_hitl_gate — fixed-step gate factory”

cinatra_sdk/hitl.py exports build_hitl_gate, which produces an async node that:

  1. Skips if rejected_at is already set (reject propagation).
  2. Looks up approval_policy.steps for the given stepNumber.
  3. Skips if requiresApproval is false (no-op gate — allows the same graph to run with or without human approval).
  4. Calls interrupt(payload) with xRenderer, schema, and values.
  5. On resume: records approval or sets rejected_at.
hitl_step_2 = build_hitl_gate(
step_number=2,
x_renderer="@cinatra-agents/some-agent:output",
schema_builder=lambda s: {"type": "object", "properties": {"approved": {"type": "boolean"}}, "required": ["approved"]},
values_builder=lambda s: {"someField": s.get("some_field")},
)
graph.add_node("hitl_step_2", hitl_step_2)

Use this factory for graphs with a known, fixed number of HITL steps declared at compile time.

{
"steps": [
{
"stepNumber": 2,
"requiresApproval": true,
"xRenderer": "@cinatra-agents/some-agent:output",
"description": "Review generated recipients"
},
{
"stepNumber": 3,
"requiresApproval": false // gate wired but skipped at runtime
}
]
}

approval_policy arrives in graphInput from the TS layer. Graphs must read it from state — never hardcode requiresApproval.

Security rule R-07 — no credentials through interrupt

Section titled “Security rule R-07 — no credentials through interrupt”

The values_builder lambda passed to build_hitl_gate (and any custom HITL node) must only select domain data from state. bearer_token, openai_api_key, anthropic_api_key, gemini_api_key, and any other server credential must never appear in the interrupt payload. The UI receives interrupt values — they must be safe to display.

Pitfall P1 — one interrupt() per node execution

Section titled “Pitfall P1 — one interrupt() per node execution”

LangGraph replays all code before the first interrupt() call on every resume. Calling interrupt() inside a Python for/while loop within a single node creates infinite re-entry. Use graph-level looping (conditional edges) instead — never interrupt inside a Python loop.


Orchestrator loop pattern (orchestrator_v1)

Section titled “Orchestrator loop pattern (orchestrator_v1)”

orchestrator_v1 differs from the other five graphs because the number of stages and HITL gates is determined at runtime from approval_policy.steps, not at compile time. The solution is a loop topology using conditional edges:

setup_collector → run_stage ⟶ hitl_gate
↑ |
└──────────────┘ (while steps remain and not rejected)
finalize → END
approval_policy: dict # from graphInput
current_step_index: int # loop cursor; incremented by hitl_gate
step_outputs: list # accumulated outputs from each run_stage call
approvals: dict # {step_number: {approved: bool, approvedAt: str}}
rejected_at: str # "step_N" — first rejection; terminates loop
status: str # "completed" | "rejected" — read by TS runner

run_stage

  • Reads current_step_index to know which step to execute.
  • Returns early with {} if rejected_at is set or index is out of bounds.
  • Builds user content from: base prompt + step description (from approval_policy.steps[idx].description) + previous step outputs for context.
  • Calls run_cinatra_llm_step(run_config=dict(state), user=user_content, max_steps=4) — same bridge pattern as all other graphs, with a tighter max_steps since each orchestrator step is one reasoning unit.
  • Appends result to step_outputs.

hitl_gate

  • Reads approval_policy.steps[current_step_index].
  • If requiresApproval is false: increments current_step_index, returns.
  • If requiresApproval is true: calls interrupt(payload) with step’s xRenderer and last stage output as values.
  • On approval: records in approvals, increments current_step_index.
  • On rejection: sets rejected_at, increments current_step_index (loop terminates via conditional).

finalize (terminal node)

  • Returns {"status": "rejected"} if rejected_at is set.
  • Returns {"status": "completed", "output": joined step outputs} on success.
  • TS runner maps "rejected"updateAgentRunStatus(runId, "stopped").

_should_continue (conditional router after hitl_gate)

def _should_continue(state) -> str:
if state.get("rejected_at"):
return "finalize"
steps = (state.get("approval_policy") or {}).get("steps") or []
if (state.get("current_step_index") or 0) < len(steps):
return "run_stage"
return "finalize"

The graph topology (nodes and edges) is fixed at compile time. The loop is implemented via add_conditional_edges — exactly the documented LangGraph pattern for dynamic loops. interrupt() fires at most once per hitl_gate execution (never inside a Python loop), satisfying Pitfall P1. State is fully declared in the TypedDict, satisfying Pitfall P8.


ModulePurpose
llm_step.pyrun_cinatra_llm_step()Primary LLM bridge — posts to /api/llm-bridge; used by every LLM node
hitl.pybuild_hitl_gate()Factory for fixed-step HITL gate nodes
setup_collector.pysetup_collector_node, make_checkpointer()Setup interrupt loop; mandatory entry point for all graphs
child_agent.pyinvoke_child_agent_via_a2a()A2A child-agent dispatch (orchestrator_v1 only — childAgent branch)
mcp.pyget_cinatra_mcp_tools()Loads all MCP tools from the Cinatra server; URL from CINATRA_MCP_SERVER_URL env
node.pybuild_cinatra_node()Legacy: assembles model + MCP tools; do not use in new graphs
skills.pyCinatraSkillBackendFuture skills middleware (currently a no-op backend)
routing.pyShared conditional routing helpers
subgraph.pyUtilities for composing subgraphs (future)

The LangGraph container (packages/langgraph-agents/Dockerfile.dev) bakes code at build time — no volume mounts. After any code change, rebuild the image.

Terminal window
packages/langgraph-agents/start-dev.sh

start-dev.sh reads BETTER_AUTH_URL from .env.local, converts localhosthost.docker.internal, and passes the resulting /api/mcp URL as CINATRA_MCP_SERVER_URL to the container. The default URL in mcp.py has no port (port 80) — production safe.

The server runs with --no-reload to prevent watchfiles from restarting the container every 10 seconds when .langgraph_api/.langgraph_ops.pckl is written.


  1. Create graphs/my_graph_v1.py — declare a TypedDict state extending the base fields (including bearer_token and a2a_base_url), implement async node functions using run_cinatra_llm_step(), implement build_graph(config).
  2. Register in langgraph.json under graphs.
  3. Register in packages/agents/src/langgraph-execution.ts TYPE_TO_GRAPH map.
  4. Rebuild the Docker image.

Do not add hardcoded MCP tool names or hardcoded step counts. Let task_spec drive the LLM, and let approval_policy.steps drive HITL.