LangGraph Integration

Graph-based agents have a control-flow problem. The moment your agent needs to decide "should I call another tool or answer?", "is this action safe to execute?", or "which past turn connects to what the user just said?", the routing logic creeps into your Python code. Before long, business rules live in if statements scattered across nodes, and every new condition means another code change.

InputLayer's LangGraph integration moves that logic into rules. You store facts, write rules that derive conclusions about what's true, and your graph queries those conclusions to decide where to go next. The graph still runs the steps. The rules decide what the steps mean.

This guide walks through the integration from install to resumable agents with semantic memory. Each section builds on the previous one and introduces just enough IQL (InputLayer's query language) to keep going.

Prerequisites

You need two things to run any example in this guide.

  1. A running InputLayer server. The fastest way is Docker:

    docker run -p 8080:8080 ghcr.io/inputlayer/inputlayer:latest

    The server listens for WebSocket connections at ws://localhost:8080/ws.

  2. Python 3.10 or newer. Check with python --version.

Confirm the server is reachable before continuing:

curl -sf http://localhost:8080/health && echo "server is up"

Installation

pip install inputlayer-client-dev[langgraph]

This pulls in langgraph and langchain-core alongside the InputLayer SDK.

Quick start

The smallest useful program. It stores a fact, writes a rule, and lets a kg_router decide which branch to take:


from inputlayer import InputLayer
from inputlayer.integrations.langgraph import InputLayerState, kg_router
from langgraph.graph import END, START, StateGraph

class State(InputLayerState):
    question: str
    answer: str

async def ask(state):
    # pretend an LLM answered. In a real app, call your model here.
    return {"answer": "blue"}

async def say_hi(state):
    return {"answer": "hi"}

graph = StateGraph(State)
graph.add_node("ask", ask)
graph.add_node("say_hi", say_hi)
graph.set_conditional_entry_point(
    kg_router(
        branches={"ask": "?is_question(X)", "say_hi": "?is_greeting(X)"},
        default="ask",
    ),
)
graph.add_edge("ask", END)
graph.add_edge("say_hi", END)
app = graph.compile()

async def main():
    async with InputLayer("ws://localhost:8080/ws",
                          username="admin", password="admin") as il:
        kg = il.knowledge_graph("qs_langgraph")
        await kg.execute("+is_question(x: string)")
        await kg.execute("+is_greeting(x: string)")
        await kg.execute('+is_question("q1")')
        result = await app.ainvoke({"kg": kg, "question": "what color?", "answer": ""})
        print(result["answer"])  # "blue"

asyncio.run(main())

That's the whole idea. A fact (+is_question("q1")) drives a routing decision. Delete the fact and the default branch wins. No Python if changed.

The building blocks

The integration exports six primitives. Each plugs into LangGraph at a specific seam:

PrimitiveWhat it does
InputLayerStateA TypedDict base that carries the KG handle through your graph state
kg_nodeA factory for query, insert, or delete graph nodes
kg_routerA conditional-edge function driven by IQL queries
InputLayerCheckpointerPersists graph state in a KG so runs can resume across processes
InputLayerMemorySemantic long-term memory with rule-derived conversation context
escape_iqlString escaping for safe IQL interpolation in parameterized queries

You don't need all six to get started. The most common pattern uses InputLayerState, kg_node, and kg_router to let rules drive the graph. InputLayerCheckpointer and InputLayerMemory are independent additions you layer in when you need them.

Part 1: Rules-driven routing

The core idea is simple. Your graph has nodes that do work. Between nodes, you need to decide what to do next. Instead of writing that decision in Python, write it as an IQL query and let the rules answer.

State that carries the KG

First, define your state. Extend InputLayerState so the KG handle flows through your graph:

from inputlayer.integrations.langgraph import InputLayerState

class AgentState(InputLayerState):
    question: str
    findings: list[str]
    answer: str

InputLayerState requires a kg field. When you invoke the graph, pass the KG handle in the initial state:

await app.ainvoke({"kg": kg, "question": "...", "findings": [], "answer": ""})

Nodes created by kg_node and routing functions created by kg_router both read the handle from state automatically.

Query nodes with kg_node

A kg_node wraps an IQL query as a LangGraph-compatible async function. The results land in your state under a key you choose:

from inputlayer.integrations.langgraph import kg_node

search = kg_node(
    query="?relevant_fact(Id, Topic, Content)",
    state_key="findings",
)
graph.add_node("search", search)

After this node runs, state["findings"] holds a dict with columns, rows, and row_count. Downstream nodes can read it.

Queries can also be parameterized by state. Pass a callable and always escape user-supplied values:

from inputlayer.integrations.langgraph import escape_iql

search = kg_node(
    query=lambda s: f'?article(Id, T, C, "{escape_iql(s["category"])}", E)',
    state_key="articles",
)

For write operations, pass a Relation class and set operation to "insert" or "delete":

from inputlayer import Relation

class Finding(Relation):
    id: int
    topic: str
    confidence: float

store = kg_node(
    relation=Finding,
    operation="insert",
    state_key="new_findings",
)

The state under state_key can be a single Relation instance, a list of instances, a single dict matching the schema, or a list of such dicts. If the key is missing or the value is empty, the node is a no-op and returns an empty state update (no write happens). Make sure an upstream node populates state["new_findings"] before this runs.

Rules-driven routing with kg_router

kg_router is a conditional-edge function. Each branch maps a target node to an IQL query. Branches are evaluated in insertion order (Python dicts preserve insertion order since 3.7). The first branch whose query returns a non-empty result wins. If no branch matches, the router returns the default:

from inputlayer.integrations.langgraph import kg_router

route = kg_router(
    branches={
        "answer": "?ready_to_answer(X)",
        "gather": "?missing_info(X)",
        "escalate": "?needs_human(X)",
    },
    default="gather",
)
graph.add_conditional_edges("reason", route)

The magic happens in the rules. You define ready_to_answer, missing_info, and needs_human as derived relations:

+ready_to_answer(X) <- answer_fact(X, _), confidence_high(X)
+missing_info(X) <- question_fact(X, _), !answer_fact(X, _)
+needs_human(X) <- risk_fact(X, "high"), !human_approval(X)

Now, when your graph reaches the routing node, the rules fire against whatever facts are in the KG. The branch that matches reflects the current state of reasoning, not a hardcoded Python condition. Adding a new route means adding a new rule, not changing your graph code.

The router skips branches that raise QueryError (bad query, unknown relation) or return an error response. Systemic failures (InputLayerConnectionError, AuthenticationError, QueryTimeoutError, ConnectionError, OSError) are re-raised immediately so you see them instead of silently misrouting.

Putting it together

Here's a minimal reasoning loop. The graph repeatedly gathers facts, and rules decide when there's enough to answer:

from inputlayer import InputLayer
from inputlayer.integrations.langgraph import InputLayerState, kg_node, kg_router
from langgraph.graph import END, StateGraph

class ResearchState(InputLayerState):
    question: str
    iteration: int

async def gather_facts(state):
    # your logic to insert new facts into the KG
    return {"iteration": state.get("iteration", 0) + 1}

async def answer(state):
    r = await state["kg"].execute("?answer_fact(Content)")
    return {"answer": r.rows[0][0] if r.rows else "I don't know"}

graph = StateGraph(ResearchState)
graph.add_node("gather", gather_facts)
graph.add_node("answer", answer)
graph.add_conditional_edges("gather", kg_router(
    branches={"answer": "?ready_to_answer(X)"},
    default="gather",  # loop
))
graph.set_entry_point("gather")
graph.add_edge("answer", END)
app = graph.compile()

async with InputLayer("ws://localhost:8080/ws", username="admin", password="admin") as il:
    kg = il.knowledge_graph("research")
    # ... define schema and rules ...
    result = await app.ainvoke({"kg": kg, "question": "...", "iteration": 0})

Part 2: Resumable graphs with InputLayerCheckpointer

LangGraph's BaseCheckpointSaver lets graphs pause and resume across processes, machines, and time. InputLayerCheckpointer is an implementation that stores every checkpoint as a fact in your KG.

Why persist to a KG

Two reasons. First, the KG you already use for reasoning is now also your agent's durable state, so there's one system to operate and back up. Second, checkpoints become queryable facts. You can write rules over them, inspect them with .why(), or audit them with IQL. Your agent's execution history is first-class data.

Basic use

from inputlayer.integrations.langgraph import InputLayerCheckpointer

checkpointer = InputLayerCheckpointer(kg=kg)
await checkpointer.setup()  # creates the schema (idempotent)

app = graph.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "user-42"}}
await app.ainvoke({"kg": kg, "question": "..."}, config=config)

# Later, after a process restart:
state = await app.aget_state(config)
# or resume:
await app.ainvoke(None, config=config)

Each thread gets its own history. Namespaces (checkpoint_ns) isolate subgraphs from their parent.

Storage management

Long-running agents accumulate checkpoints. Three methods help you see and manage what's stored:

# List every thread_id that has at least one checkpoint
threads = await checkpointer.alist_threads()

# Keep only the 10 most recent checkpoints for a thread (default namespace)
await checkpointer.prune_thread("user-42", keep_last=10)

# Wipe a thread entirely, across every namespace (checkpoints + writes)
await checkpointer.adelete_thread("user-42")

"Most recent" is measured by the ts field, which is a nanosecond wall-clock timestamp taken when the checkpoint was written. If a thread has concurrent branches (rare in practice), all branches compete for the keep_last slots based on that wall time. The older checkpoints and their graph_write entries are deleted atomically per checkpoint; the batch delete is not a single transaction, so a crash mid-prune leaves a partially pruned state that a subsequent prune_thread call will finish.

prune_thread operates on a single checkpoint_ns (default "", the parent graph). If your agent uses subgraphs, call prune_thread once per namespace you want to trim. adelete_thread wipes every namespace for that thread in one call, which is the right tool when you are retiring the whole conversation.

All three have sync variants (prune_thread_sync, delete_thread, list_threads) for non-async contexts.

Migrating from another checkpointer

The LangGraph interface is stable across implementations, so swapping in InputLayerCheckpointer is a one-line change. From MemorySaver:

# before
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()

# after
from inputlayer.integrations.langgraph import InputLayerCheckpointer
checkpointer = InputLayerCheckpointer(kg=kg)
await checkpointer.setup()  # create relations

From SqliteSaver or PostgresSaver: same swap. If you want to preserve history, the schema is relation-based, so you can replay old checkpoints by inserting into graph_checkpoint and graph_write directly. A lighter option is to point the new checkpointer at a clean KG and let it accumulate fresh history as agents run. Old checkpoints stay in the old store until you drop it.

Concurrency

Per-thread setup is guarded by a lock, so you can create one InputLayerCheckpointer and share it across coroutines. The underlying KG connection serializes commands, so concurrent aput and aget_tuple calls on the same thread are safe.

Part 3: Semantic memory with InputLayerMemory

Checkpointing is about resuming execution. Memory is about remembering what was said. InputLayerMemory stores conversation turns as facts and uses rules to derive which turns connect to which.

The base schema

Two relations hold the raw conversation:

memory_turn(thread_id, turn_id, role, content, ts)
memory_topic(thread_id, turn_id, topic)

Three rules derive context automatically:

active_topic(ThreadId, Topic) <- memory_topic(ThreadId, _, Topic)

relevant_turn(ThreadId, TurnId, Role, Content, Topic) <-
    memory_turn(ThreadId, TurnId, Role, Content, _),
    memory_topic(ThreadId, TurnId, Topic)

topic_thread(ThreadId, TopicA, TopicB) <-
    memory_topic(ThreadId, _, TopicA),
    memory_topic(ThreadId, _, TopicB),
    TopicA != TopicB

Storing and recalling

from inputlayer.integrations.langgraph import InputLayerMemory

memory = InputLayerMemory(kg=kg)
await memory.setup()

# Store a turn. Topics can be extracted automatically or passed explicitly.
await memory.astore("user-42", "user", "Help me optimize my Python ML training loop",
                    topics=["python", "ml", "performance"])

# Recall derived context
ctx = await memory.arecall("user-42")
# ctx["topics"] -> ["ml", "performance", "python"]
# ctx["recent"] -> [{"turn_id": 1, "role": "user", "content": "..."}]
# ctx["relevant"] -> {"ml": [...], "python": [...], "performance": [...]}
# ctx["related_topics"] -> [("ml", "python"), ("ml", "performance"), ("performance", "python")]

Wiring into a graph

InputLayerMemory exposes two factory methods that produce LangGraph-compatible nodes. Your state needs to carry the thread ID and the new message. The checkpointer KG lives on the memory object itself, so you don't need InputLayerState for pure-memory graphs:

from typing import TypedDict, Any

class ChatState(TypedDict, total=False):
    thread_id: str          # which conversation this is
    new_message: dict       # the incoming {"role": ..., "content": ...}
    context: dict           # recalled context (filled by recall_node)
    response: str           # your LLM's reply

graph = StateGraph(ChatState)
graph.add_node("recall", memory.recall_node(state_key="context"))
graph.add_node("respond", your_llm_logic)
graph.add_node("store", memory.store_node(state_key="new_message"))

graph.set_entry_point("recall")
graph.add_edge("recall", "respond")
graph.add_edge("respond", "store")
graph.add_edge("store", END)

app = graph.compile()
await app.ainvoke({
    "thread_id": "user-42",
    "new_message": {"role": "user", "content": "Help with Python ML"},
})

Each turn through the graph:

  1. recall reads state["thread_id"] and writes derived context into state["context"]
  2. respond is your own logic. The LLM uses state["context"] to generate a response
  3. store reads state["new_message"] (a dict with role and content) and stores it as a new turn. Rules fire automatically

The thread_key parameter defaults to "thread_id" on both nodes. If your state uses a different key, pass thread_key="your_key" explicitly.

Extending the ontology

The three base rules are a starting point. Add rules for whatever your agent needs to know. A rule for unresolved questions:

has_response(ThreadId, TurnId) <-
    memory_turn(ThreadId, TurnId, "user", _, _),
    memory_turn(ThreadId, NextTurn, "assistant", _, _),
    NextTurn > TurnId

unresolved(ThreadId, TurnId, Content) <-
    memory_turn(ThreadId, TurnId, "user", Content, _),
    !has_response(ThreadId, TurnId)

Add the rule once, and recall() picks up conclusions from it without any code change in your graph.

Production note on topic extraction

The built-in topic extractor is keyword-based and meant for demos. In production you have two options, pick whichever fits your pipeline:

  1. Pass explicit topics on every astore call. This is the most common path, because in a real agent you already have access to the LLM that generated the turn.

    topics = await your_llm_topic_extractor(content)
    await memory.astore(thread_id, role, content, topics=topics)
  2. Plug an extractor into the memory object. If most turns enter memory through a store_node, install the extractor once:

    async def extract(content: str) -> list[str]:
        ...  # call your LLM, return the tags it found
    
    def extract_sync(content: str) -> list[str]:
        # topic_extractor must be sync, so wrap your async function if needed
        return asyncio.run(extract(content))
    
    memory = InputLayerMemory(kg=kg, topic_extractor=extract_sync)

Either way, memory.astore(..., topics=[...]) always wins over the extractor, so you can override per-turn when needed. The ontology is what matters, not the extractor.

Listing stored conversations

For admin tools, cleanup jobs, or replay pipelines, you often want to know what's in memory:

threads = await memory.alist_threads()  # sorted list of thread_ids
# or the blocking variant:
threads = memory.list_threads()

The list comes back sorted and contains every thread that has at least one stored turn.

Combining the checkpointer and memory in one graph

Real agents usually need both. The checkpointer keeps the graph resumable across restarts. The memory keeps the conversation coherent across many turns. They coexist happily because they use separate relations in the same KG and read the thread identifier from different places:

  • InputLayerCheckpointer reads config["configurable"]["thread_id"].
  • InputLayerMemory reads state["thread_id"] (configurable via thread_key).

Wire both up and you get a durable, stateful chat agent in about thirty lines:

from typing import TypedDict, Any
from inputlayer import InputLayer
from inputlayer.integrations.langgraph import (
    InputLayerCheckpointer,
    InputLayerMemory,
)
from langgraph.graph import END, StateGraph

class ChatState(TypedDict, total=False):
    thread_id: str
    new_message: dict
    context: dict
    response: str

async def respond(state):
    ctx = state.get("context", {})
    # Call your LLM with ctx["topics"], ctx["recent"], ctx["relevant"]
    return {"response": "..."}

async with InputLayer("ws://localhost:8080/ws",
                      username="admin", password="admin") as il:
    kg = il.knowledge_graph("chat_agent")
    memory = InputLayerMemory(kg=kg)
    checkpointer = InputLayerCheckpointer(kg=kg)
    await memory.setup()
    await checkpointer.setup()

    graph = StateGraph(ChatState)
    graph.add_node("recall", memory.recall_node(state_key="context"))
    graph.add_node("respond", respond)
    graph.add_node("store", memory.store_node(state_key="new_message"))
    graph.set_entry_point("recall")
    graph.add_edge("recall", "respond")
    graph.add_edge("respond", "store")
    graph.add_edge("store", END)

    app = graph.compile(checkpointer=checkpointer)

    thread = "alex-session-42"
    config = {"configurable": {"thread_id": thread}}
    await app.ainvoke(
        {"thread_id": thread,
         "new_message": {"role": "user", "content": "Help me with Python ML"}},
        config=config,
    )
    # Restart the process; a fresh `app` with the same checkpointer and
    # memory will resume from the last checkpoint, and `recall` still
    # surfaces derived context from every prior turn.

The thread_id appears twice on purpose. The state copy is what the memory nodes read. The config copy is what the checkpointer uses to key its state. Using the same value everywhere keeps the two aligned so "resume a thread" and "remember a conversation" mean the same thing.

Safe parameter binding

When interpolating user input into IQL, always use escape_iql:

from inputlayer.integrations.langgraph import escape_iql

user_query = state["user_input"]  # untrusted
q = f'?article(Id, T, "{escape_iql(user_query)}", E)'

escape_iql handles backslashes, double-quotes, newlines, tabs, nul bytes, and other control characters so the resulting string is a valid IQL literal. Never concatenate untrusted input into IQL without it.

Sync and async

Every primitive supports both sync and async. The async methods (aput, aget_tuple, astore, arecall, adelete_thread, etc.) are the primary API. Sync wrappers (put, get_tuple, store, recall, delete_thread, etc.) work inside Jupyter, scripts, and any non-async context.

Examples

The SDK ships with 12 runnable examples at examples/langgraph/:

#ExampleWhat it shows
1Reasoning loopAccumulate facts, rules decide when to stop
2InvestigationMulti-step evidence gathering
3Human-in-the-loopPolicy rules gate actions for approval
4Branching pipelineRoute documents through parallel analysis
5Self-correcting agentValidation rules catch and fix errors
6Collaborative planningMulti-agent task decomposition
7Event correlationPattern detection across event streams
8Tool selectionRules pick the right tool per context
9Streaming aggregationThreshold-based alerts from streaming data
10Resumable graphCheckpoint, crash, resume from persisted state
11Semantic memoryStore turns as facts, recall derived context
12Resumable chatCheckpointer and memory together on one KG

Run them:

# Show the menu
uv run python -m examples.langgraph.runner

# Run specific examples
uv run python -m examples.langgraph.runner 1 10 11

# Run a range
uv run python -m examples.langgraph.runner 1-5

# Run all
uv run python -m examples.langgraph.runner --all

Examples marked [LLM] need an OpenAI-compatible server at localhost:1234 (LM Studio works). Set INPUTLAYER_URL, INPUTLAYER_USER, INPUTLAYER_PASSWORD to override the defaults.

Troubleshooting

ValueError: thread_id must be a non-empty string. The integration expects a non-empty string thread identifier everywhere it asks for one. Thread IDs, roles, and content are base64-encoded before they touch IQL, so any Unicode string is safe on the wire. The only requirement is that the ID itself is a non-empty str.

KeyError: thread_id. The checkpointer expects a config like {"configurable": {"thread_id": "..."}} on every call. Pass it to app.ainvoke and app.aget_state.

RuntimeError: KG returned an error. The KG rejected a query. The error message includes the offending IQL. Common causes: a relation you reference doesn't exist yet (call setup() first), or a rule has a syntax error.

Router always returns default. The branch queries are running, but no branch matches. Check the rules that derive those relations are installed (.rule list in the REPL), and that the facts those rules depend on are present.

Graph hangs. The checkpointer and memory have a 30-second default timeout on every KG call. If you see a hang longer than that, the underlying WebSocket connection is stuck. Enable logging.getLogger("inputlayer").setLevel("DEBUG") to see command traffic.

RuntimeError: 1/3 topic inserts failed. astore inserts the turn row first, then its topics concurrently. If the KG is overloaded or the connection drops mid-batch, some topic rows may be missing for that turn. The turn itself is already persisted. Blindly retrying astore with the same content creates a new turn with a fresh ID, so you get two rows for the same content. Prefer one of: (a) insert the missing topics directly with +memory_topic("<thread>", <turn_id>, "<topic>"), using the turn_id that astore returned, or (b) call adelete_thread and replay the conversation.

Known limitations

  • Non-atomic writes. aput_writes deletes existing writes for a task before inserting new ones. If the process crashes between delete and insert, writes for that task are lost. This is documented in the source and mirrors the tradeoff in other checkpointer implementations.
  • Soft thread-tracking limit. InputLayerMemory keeps per-thread locks and turn counters in memory. Once max_tracked_threads is exceeded, idle threads are evicted. If all tracked threads are active, excess threads are admitted with a warning and evicted once idle.
  • Topic extractor is keyword-based. The built-in extractor is for demos. Use an LLM in production and pass topics explicitly to astore().