LangGraph Integration
Graph-based agents have a control-flow problem. The moment your agent needs to decide "should I call another tool or answer?", "is this action safe to execute?", or "which past turn connects to what the user just said?", the routing logic creeps into your Python code. Before long, business rules live in if statements scattered across nodes, and every new condition means another code change.
InputLayer's LangGraph integration moves that logic into rules. You store facts, write rules that derive conclusions about what's true, and your graph queries those conclusions to decide where to go next. The graph still runs the steps. The rules decide what the steps mean.
This guide walks through the integration from install to resumable agents with semantic memory. Each section builds on the previous one and introduces just enough IQL (InputLayer's query language) to keep going.
Prerequisites
You need two things to run any example in this guide.
-
A running InputLayer server. The fastest way is Docker:
docker run 8080:8080 ghcr.io/inputlayer/inputlayer:latestThe server listens for WebSocket connections at
ws://localhost:8080/ws. -
Python 3.10 or newer. Check with
python --version.
Confirm the server is reachable before continuing:
curl http://localhost:8080/health && echo "server is up"
Installation
pip install inputlayer[langgraph]
This pulls in langgraph and langchain-core alongside the InputLayer SDK.
Quick start
The smallest useful program. It stores a fact, writes a rule, and lets a kg_router decide which branch to take:
from inputlayer import InputLayer
from inputlayer.integrations.langgraph import InputLayerState, kg_router
from langgraph.graph import END, START, StateGraph
class State(InputLayerState):
question: str
answer: str
async def ask(state):
# pretend an LLM answered. In a real app, call your model here.
return {"answer": "blue"}
async def say_hi(state):
return {"answer": "hi"}
graph = StateGraph(State)
graph.add_node("ask", ask)
graph.add_node("say_hi", say_hi)
graph.set_conditional_entry_point(
kg_router(
branches={"ask": "?is_question(X)", "say_hi": "?is_greeting(X)"},
default="ask",
),
)
graph.add_edge("ask", END)
graph.add_edge("say_hi", END)
app = graph.compile()
async def main():
async with InputLayer("ws://localhost:8080/ws",
username="admin", password="admin") as il:
kg = il.knowledge_graph("qs_langgraph")
await kg.execute("+is_question(x: string)")
await kg.execute("+is_greeting(x: string)")
await kg.execute('+is_question("q1")')
result = await app.ainvoke({"kg": kg, "question": "what color?", "answer": ""})
print(result["answer"]) # "blue"
asyncio.run(main())
That's the whole idea. A fact (+is_question("q1")) drives a routing decision. Delete the fact and the default branch wins. No Python if changed.
The building blocks
The integration exports six primitives. Each plugs into LangGraph at a specific seam:
| Primitive | What it does |
|---|---|
InputLayerState | A TypedDict base that carries the KG handle through your graph state |
kg_node | A factory for query, insert, or delete graph nodes |
kg_router | A conditional-edge function driven by IQL queries |
InputLayerCheckpointer | Persists graph state in a KG so runs can resume across processes |
InputLayerMemory | Semantic long-term memory with rule-derived conversation context |
escape_iql | String escaping for safe IQL interpolation in parameterized queries |
You don't need all six to get started. The most common pattern uses InputLayerState, kg_node, and kg_router to let rules drive the graph. InputLayerCheckpointer and InputLayerMemory are independent additions you layer in when you need them.
Part 1: Rules-driven routing
The core idea is simple. Your graph has nodes that do work. Between nodes, you need to decide what to do next. Instead of writing that decision in Python, write it as an IQL query and let the rules answer.
State that carries the KG
First, define your state. Extend InputLayerState so the KG handle flows through your graph:
from inputlayer.integrations.langgraph import InputLayerState
class AgentState(InputLayerState):
question: str
findings: list[str]
answer: str
InputLayerState requires a kg field. When you invoke the graph, pass the KG handle in the initial state:
await app.ainvoke({"kg": kg, "question": "...", "findings": [], "answer": ""})
Nodes created by kg_node and routing functions created by kg_router both read the handle from state automatically.
Query nodes with kg_node
A kg_node wraps an IQL query as a LangGraph-compatible async function. The results land in your state under a key you choose:
from inputlayer.integrations.langgraph import kg_node
search = kg_node(
query="?relevant_fact(Id, Topic, Content)",
state_key="findings",
)
graph.add_node("search", search)
After this node runs, state["findings"] holds a dict with columns, rows, and row_count. Downstream nodes can read it.
Queries can also be parameterized by state. Pass a callable and always escape user-supplied values:
from inputlayer.integrations.langgraph import escape_iql
search = kg_node(
query=lambda s: f'?article(Id, T, C, "{escape_iql(s["category"])}", E)',
state_key="articles",
)
For write operations, pass a Relation class and set operation to "insert" or "delete":
from inputlayer import Relation
class Finding(Relation):
id: int
topic: str
confidence: float
store = kg_node(
relation=Finding,
operation="insert",
state_key="new_findings",
)
The state under state_key can be a single Relation instance, a list of instances, a single dict matching the schema, or a list of such dicts. If the key is missing or the value is empty, the node is a no-op and returns an empty state update (no write happens). Make sure an upstream node populates state["new_findings"] before this runs.
Rules-driven routing with kg_router
kg_router is a conditional-edge function. Each branch maps a target node to an IQL query. Branches are evaluated in insertion order (Python dicts preserve insertion order since 3.7). The first branch whose query returns a non-empty result wins. If no branch matches, the router returns the default:
from inputlayer.integrations.langgraph import kg_router
route = kg_router(
branches={
"answer": "?ready_to_answer(X)",
"gather": "?missing_info(X)",
"escalate": "?needs_human(X)",
},
default="gather",
)
graph.add_conditional_edges("reason", route)
The magic happens in the rules. You define ready_to_answer, missing_info, and needs_human as derived relations:
+ready_to_answer(X) <- answer_fact(X, _), confidence_high(X)
+missing_info(X) <- question_fact(X, _), !answer_fact(X, _)
+needs_human(X) <- risk_fact(X, "high"), !human_approval(X)
Now, when your graph reaches the routing node, the rules fire against whatever facts are in the KG. The branch that matches reflects the current state of reasoning, not a hardcoded Python condition. Adding a new route means adding a new rule, not changing your graph code.
The router skips branches that raise QueryError (bad query, unknown relation) or return an error response. Systemic failures (InputLayerConnectionError, AuthenticationError, QueryTimeoutError, ConnectionError, OSError) are re-raised immediately so you see them instead of silently misrouting.
Putting it together
Here's a minimal reasoning loop. The graph repeatedly gathers facts, and rules decide when there's enough to answer:
from inputlayer import InputLayer
from inputlayer.integrations.langgraph import InputLayerState, kg_node, kg_router
from langgraph.graph import END, StateGraph
class ResearchState(InputLayerState):
question: str
iteration: int
async def gather_facts(state):
# your logic to insert new facts into the KG
return {"iteration": state.get("iteration", 0) + 1}
async def answer(state):
r = await state["kg"].execute("?answer_fact(Content)")
return {"answer": r.rows[0][0] if r.rows else "I don't know"}
graph = StateGraph(ResearchState)
graph.add_node("gather", gather_facts)
graph.add_node("answer", answer)
graph.add_conditional_edges("gather", kg_router(
branches={"answer": "?ready_to_answer(X)"},
default="gather", # loop
))
graph.set_entry_point("gather")
graph.add_edge("answer", END)
app = graph.compile()
async with InputLayer("ws://localhost:8080/ws", username="admin", password="admin") as il:
kg = il.knowledge_graph("research")
# ... define schema and rules ...
result = await app.ainvoke({"kg": kg, "question": "...", "iteration": 0})
Part 2: Resumable graphs with InputLayerCheckpointer
LangGraph's BaseCheckpointSaver lets graphs pause and resume across processes, machines, and time. InputLayerCheckpointer is an implementation that stores every checkpoint as a fact in your KG.
Why persist to a KG
Two reasons. First, the KG you already use for reasoning is now also your agent's durable state, so there's one system to operate and back up. Second, checkpoints become queryable facts. You can write rules over them, inspect them with .why(), or audit them with IQL. Your agent's execution history is first-class data.
Basic use
from inputlayer.integrations.langgraph import InputLayerCheckpointer
checkpointer = InputLayerCheckpointer(kg=kg)
await checkpointer.setup() # creates the schema (idempotent)
app = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-42"}}
await app.ainvoke({"kg": kg, "question": "..."}, config=config)
# Later, after a process restart:
state = await app.aget_state(config)
# or resume:
await app.ainvoke(None, config=config)
Each thread gets its own history. Namespaces (checkpoint_ns) isolate subgraphs from their parent.
Storage management
Long-running agents accumulate checkpoints. Three methods help you see and manage what's stored:
# List every thread_id that has at least one checkpoint
threads = await checkpointer.alist_threads()
# Keep only the 10 most recent checkpoints for a thread (default namespace)
await checkpointer.prune_thread("user-42", keep_last=10)
# Wipe a thread entirely, across every namespace (checkpoints + writes)
await checkpointer.adelete_thread("user-42")
"Most recent" is measured by the ts field, which is a nanosecond wall-clock timestamp taken when the checkpoint was written. If a thread has concurrent branches (rare in practice), all branches compete for the keep_last slots based on that wall time. The older checkpoints and their graph_write entries are deleted atomically per checkpoint; the batch delete is not a single transaction, so a crash mid-prune leaves a partially pruned state that a subsequent prune_thread call will finish.
prune_thread operates on a single checkpoint_ns (default "", the parent graph). If your agent uses subgraphs, call prune_thread once per namespace you want to trim. adelete_thread wipes every namespace for that thread in one call, which is the right tool when you are retiring the whole conversation.
All three have sync variants (prune_thread_sync, delete_thread, list_threads) for non-async contexts.
Migrating from another checkpointer
The LangGraph interface is stable across implementations, so swapping in InputLayerCheckpointer is a one-line change. From MemorySaver:
# before
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
# after
from inputlayer.integrations.langgraph import InputLayerCheckpointer
checkpointer = InputLayerCheckpointer(kg=kg)
await checkpointer.setup() # create relations
From SqliteSaver or PostgresSaver: same swap. If you want to preserve history, the schema is relation-based, so you can replay old checkpoints by inserting into graph_checkpoint and graph_write directly. A lighter option is to point the new checkpointer at a clean KG and let it accumulate fresh history as agents run. Old checkpoints stay in the old store until you drop it.
Concurrency
Per-thread setup is guarded by a lock, so you can create one InputLayerCheckpointer and share it across coroutines. The underlying KG connection serializes commands, so concurrent aput and aget_tuple calls on the same thread are safe.
Part 3: Semantic memory with InputLayerMemory
Checkpointing is about resuming execution. Memory is about remembering what was said. InputLayerMemory stores conversation turns as facts and uses rules to derive which turns connect to which.
The base schema
Two relations hold the raw conversation:
memory_turn(thread_id, turn_id, role, content, ts)
memory_topic(thread_id, turn_id, topic)
Three rules derive context automatically:
active_topic(ThreadId, Topic) <- memory_topic(ThreadId, _, Topic)
relevant_turn(ThreadId, TurnId, Role, Content, Topic) <-
memory_turn(ThreadId, TurnId, Role, Content, _),
memory_topic(ThreadId, TurnId, Topic)
topic_thread(ThreadId, TopicA, TopicB) <-
memory_topic(ThreadId, _, TopicA),
memory_topic(ThreadId, _, TopicB),
TopicA != TopicB
Storing and recalling
from inputlayer.integrations.langgraph import InputLayerMemory
memory = InputLayerMemory(kg=kg)
await memory.setup()
# Store a turn. Topics can be extracted automatically or passed explicitly.
await memory.astore("user-42", "user", "Help me optimize my Python ML training loop",
topics=["python", "ml", "performance"])
# Recall derived context
ctx = await memory.arecall("user-42")
# ctx["topics"] -> ["ml", "performance", "python"]
# ctx["recent"] -> [{"turn_id": 1, "role": "user", "content": "..."}]
# ctx["relevant"] -> {"ml": [...], "python": [...], "performance": [...]}
# ctx["related_topics"] -> [("ml", "python"), ("ml", "performance"), ("performance", "python")]
Wiring into a graph
InputLayerMemory exposes two factory methods that produce LangGraph-compatible nodes. Your state needs to carry the thread ID and the new message. The checkpointer KG lives on the memory object itself, so you don't need InputLayerState for pure-memory graphs:
from typing import TypedDict, Any
class ChatState(TypedDict, total=False):
thread_id: str # which conversation this is
new_message: dict # the incoming {"role": ..., "content": ...}
context: dict # recalled context (filled by recall_node)
response: str # your LLM's reply
graph = StateGraph(ChatState)
graph.add_node("recall", memory.recall_node(state_key="context"))
graph.add_node("respond", your_llm_logic)
graph.add_node("store", memory.store_node(state_key="new_message"))
graph.set_entry_point("recall")
graph.add_edge("recall", "respond")
graph.add_edge("respond", "store")
graph.add_edge("store", END)
app = graph.compile()
await app.ainvoke({
"thread_id": "user-42",
"new_message": {"role": "user", "content": "Help with Python ML"},
})
Each turn through the graph:
- recall reads
state["thread_id"]and writes derived context intostate["context"] - respond is your own logic. The LLM uses
state["context"]to generate a response - store reads
state["new_message"](a dict withroleandcontent) and stores it as a new turn. Rules fire automatically
The thread_key parameter defaults to "thread_id" on both nodes. If your state uses a different key, pass thread_key="your_key" explicitly.
Extending the ontology
The three base rules are a starting point. Add rules for whatever your agent needs to know. A rule for unresolved questions:
has_response(ThreadId, TurnId) <-
memory_turn(ThreadId, TurnId, "user", _, _),
memory_turn(ThreadId, NextTurn, "assistant", _, _),
NextTurn > TurnId
unresolved(ThreadId, TurnId, Content) <-
memory_turn(ThreadId, TurnId, "user", Content, _),
!has_response(ThreadId, TurnId)
Add the rule once, and recall() picks up conclusions from it without any code change in your graph.
Production note on topic extraction
The built-in topic extractor is keyword-based and meant for demos. In production you have two options, pick whichever fits your pipeline:
-
Pass explicit topics on every
astorecall. This is the most common path, because in a real agent you already have access to the LLM that generated the turn.topics = await your_llm_topic_extractor(content) await memory.astore(thread_id, role, content, topics=topics) -
Plug an extractor into the memory object. If most turns enter memory through a
store_node, install the extractor once:async def extract(content: str) -> list[str]: ... # call your LLM, return the tags it found def extract_sync(content: str) -> list[str]: # topic_extractor must be sync, so wrap your async function if needed return asyncio.run(extract(content)) memory = InputLayerMemory(kg=kg, topic_extractor=extract_sync)
Either way, memory.astore(..., topics=[...]) always wins over the extractor, so you can override per-turn when needed. The ontology is what matters, not the extractor.
Listing stored conversations
For admin tools, cleanup jobs, or replay pipelines, you often want to know what's in memory:
threads = await memory.alist_threads() # sorted list of thread_ids
# or the blocking variant:
threads = memory.list_threads()
The list comes back sorted and contains every thread that has at least one stored turn.
Combining the checkpointer and memory in one graph
Real agents usually need both. The checkpointer keeps the graph resumable across restarts. The memory keeps the conversation coherent across many turns. They coexist happily because they use separate relations in the same KG and read the thread identifier from different places:
InputLayerCheckpointerreadsconfig["configurable"]["thread_id"].InputLayerMemoryreadsstate["thread_id"](configurable viathread_key).
Wire both up and you get a durable, stateful chat agent in about thirty lines:
from typing import TypedDict, Any
from inputlayer import InputLayer
from inputlayer.integrations.langgraph import (
InputLayerCheckpointer,
InputLayerMemory,
)
from langgraph.graph import END, StateGraph
class ChatState(TypedDict, total=False):
thread_id: str
new_message: dict
context: dict
response: str
async def respond(state):
ctx = state.get("context", {})
# Call your LLM with ctx["topics"], ctx["recent"], ctx["relevant"]
return {"response": "..."}
async with InputLayer("ws://localhost:8080/ws",
username="admin", password="admin") as il:
kg = il.knowledge_graph("chat_agent")
memory = InputLayerMemory(kg=kg)
checkpointer = InputLayerCheckpointer(kg=kg)
await memory.setup()
await checkpointer.setup()
graph = StateGraph(ChatState)
graph.add_node("recall", memory.recall_node(state_key="context"))
graph.add_node("respond", respond)
graph.add_node("store", memory.store_node(state_key="new_message"))
graph.set_entry_point("recall")
graph.add_edge("recall", "respond")
graph.add_edge("respond", "store")
graph.add_edge("store", END)
app = graph.compile(checkpointer=checkpointer)
thread = "alex-session-42"
config = {"configurable": {"thread_id": thread}}
await app.ainvoke(
{"thread_id": thread,
"new_message": {"role": "user", "content": "Help me with Python ML"}},
config=config,
)
# Restart the process; a fresh `app` with the same checkpointer and
# memory will resume from the last checkpoint, and `recall` still
# surfaces derived context from every prior turn.
The thread_id appears twice on purpose. The state copy is what the memory nodes read. The config copy is what the checkpointer uses to key its state. Using the same value everywhere keeps the two aligned so "resume a thread" and "remember a conversation" mean the same thing.
Safe parameter binding
When interpolating user input into IQL, always use escape_iql:
from inputlayer.integrations.langgraph import escape_iql
user_query = state["user_input"] # untrusted
q = f'?article(Id, T, "{escape_iql(user_query)}", E)'
escape_iql handles backslashes, double-quotes, newlines, tabs, nul bytes, and other control characters so the resulting string is a valid IQL literal. Never concatenate untrusted input into IQL without it.
Sync and async
Every primitive supports both sync and async. The async methods (aput, aget_tuple, astore, arecall, adelete_thread, etc.) are the primary API. Sync wrappers (put, get_tuple, store, recall, delete_thread, etc.) work inside Jupyter, scripts, and any non-async context.
Examples
The SDK ships with 12 runnable examples at examples/langgraph/:
| # | Example | What it shows |
|---|---|---|
| 1 | Reasoning loop | Accumulate facts, rules decide when to stop |
| 2 | Investigation | Multi-step evidence gathering |
| 3 | Human-in-the-loop | Policy rules gate actions for approval |
| 4 | Branching pipeline | Route documents through parallel analysis |
| 5 | Self-correcting agent | Validation rules catch and fix errors |
| 6 | Collaborative planning | Multi-agent task decomposition |
| 7 | Event correlation | Pattern detection across event streams |
| 8 | Tool selection | Rules pick the right tool per context |
| 9 | Streaming aggregation | Threshold-based alerts from streaming data |
| 10 | Resumable graph | Checkpoint, crash, resume from persisted state |
| 11 | Semantic memory | Store turns as facts, recall derived context |
| 12 | Resumable chat | Checkpointer and memory together on one KG |
Run them:
# Show the menu
uv run python examples.langgraph.runner
# Run specific examples
uv run python examples.langgraph.runner 1 10 11
# Run a range
uv run python examples.langgraph.runner 1
# Run all
uv run python examples.langgraph.runner
Examples marked [LLM] need an OpenAI-compatible server at localhost:1234 (LM Studio works). Set INPUTLAYER_URL, INPUTLAYER_USER, INPUTLAYER_PASSWORD to override the defaults.
Troubleshooting
ValueError: thread_id must be a non-empty string. The integration expects a non-empty string thread identifier everywhere it asks for one. Thread IDs, roles, and content are base64-encoded before they touch IQL, so any Unicode string is safe on the wire. The only requirement is that the ID itself is a non-empty str.
KeyError: thread_id. The checkpointer expects a config like {"configurable": {"thread_id": "..."}} on every call. Pass it to app.ainvoke and app.aget_state.
RuntimeError: KG returned an error. The KG rejected a query. The error message includes the offending IQL. Common causes: a relation you reference doesn't exist yet (call setup() first), or a rule has a syntax error.
Router always returns default. The branch queries are running, but no branch matches. Check the rules that derive those relations are installed (.rule list in the REPL), and that the facts those rules depend on are present.
Graph hangs. The checkpointer and memory have a 30-second default timeout on every KG call. If you see a hang longer than that, the underlying WebSocket connection is stuck. Enable logging.getLogger("inputlayer").setLevel("DEBUG") to see command traffic.
RuntimeError: 1/3 topic inserts failed. astore inserts the turn row first, then its topics concurrently. If the KG is overloaded or the connection drops mid-batch, some topic rows may be missing for that turn. The turn itself is already persisted. Blindly retrying astore with the same content creates a new turn with a fresh ID, so you get two rows for the same content. Prefer one of: (a) insert the missing topics directly with +memory_topic("<thread>", <turn_id>, "<topic>"), using the turn_id that astore returned, or (b) call adelete_thread and replay the conversation.
Known limitations
- Non-atomic writes.
aput_writesdeletes existing writes for a task before inserting new ones. If the process crashes between delete and insert, writes for that task are lost. This is documented in the source and mirrors the tradeoff in other checkpointer implementations. - Soft thread-tracking limit.
InputLayerMemorykeeps per-thread locks and turn counters in memory. Oncemax_tracked_threadsis exceeded, idle threads are evicted. If all tracked threads are active, excess threads are admitted with a warning and evicted once idle. - Topic extractor is keyword-based. The built-in extractor is for demos. Use an LLM in production and pass topics explicitly to
astore().